blob: 91ff454c9542b23fe54c6fda54b1f7eb61d79b48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Cluster.h"
#include <signal.h>
#include <future>
#include <boost/filesystem.hpp>
Locator::Locator(Cluster &cluster, std::vector<Locator> &locators,
std::string name, uint16_t jmxManagerPort, bool useIPv6)
: cluster_(cluster),
name_(std::move(name)),
locators_(locators),
jmxManagerPort_(jmxManagerPort) {
auto hostname = "localhost";
if (useIPv6) {
hostname = "ip6-localhost";
}
auto port = Framework::getAvailablePort();
locatorAddress_ = LocatorAddress{hostname, port};
}
Locator::~Locator() {
try {
if (started_) {
stop();
}
} catch (...) {
}
}
Locator::Locator(Locator &&move)
: cluster_(move.cluster_),
name_(move.name_),
locators_(move.locators_),
locatorAddress_(move.locatorAddress_),
jmxManagerPort_(move.jmxManagerPort_),
started_(move.started_) {
move.started_ = false;
}
const LocatorAddress &Locator::getAddress() const { return locatorAddress_; }
void Locator::start() {
if (started_) return;
auto safeName = name_;
std::replace(safeName.begin(), safeName.end(), '/', '_');
if (boost::filesystem::is_regular_file(name_ + "/vf.gf.locator.pid")) {
cluster_.getGfsh().stop().locator().withDir(name_).execute();
}
auto locator = cluster_.getGfsh()
.start()
.locator()
.withLogLevel("INFO")
.withDir(name_)
.withName(safeName)
.withBindAddress(locatorAddress_.address)
.withPort(locatorAddress_.port)
.withMaxHeap("256m")
.withJmxManagerPort(jmxManagerPort_)
.withHttpServicePort(0)
.withClasspath(cluster_.getClasspath())
.withSecurityManager(cluster_.getSecurityManager())
.withPreferIPv6(cluster_.getUseIPv6())
.withJmxManagerStart(true);
if (cluster_.useSsl()) {
locator.withConnect(false)
.withSslEnabledComponents("all")
.withSslRequireAuthentication(cluster_.requireSslAuthentication())
.withSslKeystore(cluster_.keystore())
.withSslTruststore(cluster_.truststore())
.withSslKeystorePassword(cluster_.keystorePassword())
.withSslTruststorePassword(cluster_.truststorePassword());
}
locator.execute(cluster_.getUser(), cluster_.getPassword(),
cluster_.keystore(), cluster_.truststore(),
cluster_.keystorePassword(), cluster_.truststorePassword());
auto connect =
cluster_.getGfsh().connect().withJmxManager(cluster_.getJmxManager());
if (!cluster_.getUser().empty()) {
connect.withUser(cluster_.getUser()).withPassword(cluster_.getPassword());
}
if (cluster_.useSsl()) {
connect.withUseSsl(true)
.withKeystore(cluster_.keystore())
.withTruststore(cluster_.truststore())
.withKeystorePassword(cluster_.keystorePassword())
.withTruststorePassword(cluster_.truststorePassword());
}
connect.execute();
started_ = true;
}
void Locator::stop() {
cluster_.getGfsh().stop().locator().withDir(name_).execute();
started_ = false;
}
Server::Server(Cluster &cluster, std::vector<Locator> &locators,
std::string name, std::string xmlFile, bool useIPv6)
: cluster_(cluster),
locators_(locators),
name_(std::move(name)),
xmlFile_(xmlFile) {
auto hostname = "localhost";
if (useIPv6) {
hostname = "ip6-localhost";
}
auto port = static_cast<uint16_t>(0);
serverAddress_ = ServerAddress{hostname, port};
}
std::string Server::getCacheXMLFile() { return xmlFile_; }
Server::~Server() {
try {
if (started_) {
stop();
}
} catch (...) {
}
}
Server::Server(Server &&move)
: cluster_(move.cluster_),
locators_(move.locators_),
serverAddress_(move.serverAddress_),
started_(move.started_),
name_(move.name_),
xmlFile_(move.xmlFile_) {
move.started_ = false;
}
void Server::start() {
auto safeName = name_;
std::replace(safeName.begin(), safeName.end(), '/', '_');
auto server =
cluster_.getGfsh()
.start()
.server()
.withDir(name_)
.withName(safeName)
.withBindAddress(serverAddress_.address)
.withPort(serverAddress_.port)
.withMaxHeap("1g")
.withLocators(locators_.front().getAddress().address + "[" +
std::to_string(locators_.front().getAddress().port) +
"]")
.withClasspath(cluster_.getClasspath())
.withSecurityManager(cluster_.getSecurityManager())
.withCacheXMLFile(getCacheXMLFile())
.withPreferIPv6(cluster_.getUseIPv6());
if (!cluster_.getUser().empty()) {
server.withUser(cluster_.getUser()).withPassword(cluster_.getPassword());
}
if (cluster_.useSsl()) {
server.withSslEnabledComponents("all")
.withSslRquireAuthentication(cluster_.requireSslAuthentication())
.withSslKeystore(cluster_.keystore())
.withSslTruststore(cluster_.truststore())
.withSslKeystorePassword(cluster_.keystorePassword())
.withSslTruststorePassword(cluster_.truststorePassword());
}
server.execute();
started_ = true;
}
void Server::stop() {
cluster_.getGfsh().stop().server().withDir(name_).execute();
started_ = false;
}
Cluster::Cluster(LocatorCount initialLocators, ServerCount initialServers,
UseIpv6 useIPv6)
: Cluster(
Name(std::string(::testing::UnitTest::GetInstance()
->current_test_info()
->test_case_name()) +
"/" +
::testing::UnitTest::GetInstance()->current_test_info()->name()),
initialLocators, initialServers, useIPv6) {}
Cluster::Cluster(LocatorCount initialLocators, ServerCount initialServers)
: Cluster(
Name(std::string(::testing::UnitTest::GetInstance()
->current_test_info()
->test_case_name()) +
"/" +
::testing::UnitTest::GetInstance()->current_test_info()->name()),
initialLocators, initialServers) {}
Cluster::Cluster(LocatorCount initialLocators, ServerCount initialServers,
CacheXMLFiles cacheXMLFiles)
: name_(std::string(::testing::UnitTest::GetInstance()
->current_test_info()
->test_case_name()) +
"/" +
::testing::UnitTest::GetInstance()->current_test_info()->name()),
initialLocators_(initialLocators.get()),
initialServers_(initialServers.get()),
jmxManagerPort_(Framework::getAvailablePort()) {
removeServerDirectory();
cacheXMLFiles_ = cacheXMLFiles.get();
}
Cluster::Cluster(Name name, LocatorCount initialLocators,
ServerCount initialServers, UseIpv6 useIPv6)
: Cluster(Name(name.get()), Classpath(""), SecurityManager(""), User(""),
Password(""), initialLocators, initialServers, CacheXMLFiles({}),
useIPv6) {}
Cluster::Cluster(Name name, LocatorCount initialLocators,
ServerCount initialServers)
: Cluster(Name(name.get()), Classpath(""), SecurityManager(""), User(""),
Password(""), initialLocators, initialServers, CacheXMLFiles({}),
UseIpv6(false)) {}
Cluster::Cluster(Name name, Classpath classpath,
SecurityManager securityManager, User user, Password password,
LocatorCount initialLocators, ServerCount initialServers,
CacheXMLFiles cacheXMLFiles, UseIpv6 useIPv6)
: name_(name.get()),
classpath_(classpath.get()),
securityManager_(securityManager.get()),
user_(user.get()),
password_(password.get()),
initialLocators_(initialLocators.get()),
initialServers_(initialServers.get()) {
jmxManagerPort_ = Framework::getAvailablePort();
cacheXMLFiles_ = cacheXMLFiles.get();
useIPv6_ = useIPv6.get();
removeServerDirectory();
}
Cluster::Cluster(Name name, Classpath classpath,
SecurityManager securityManager, User user, Password password,
LocatorCount initialLocators, ServerCount initialServers)
: name_(name.get()),
classpath_(classpath.get()),
securityManager_(securityManager.get()),
user_(user.get()),
password_(password.get()),
initialLocators_(initialLocators.get()),
initialServers_(initialServers.get()) {
jmxManagerPort_ = Framework::getAvailablePort();
removeServerDirectory();
}
Cluster::~Cluster() {
try {
if (started_) {
stop();
}
} catch (...) {
}
}
void Cluster::removeServerDirectory() {
boost::filesystem::path serverDir = boost::filesystem::relative(name_);
boost::filesystem::remove_all(serverDir);
}
apache::geode::client::Cache Cluster::createCache() { return createCache({}); }
apache::geode::client::Cache Cluster::createCache(
const std::unordered_map<std::string, std::string> &properties) {
return createCache(properties, false);
}
apache::geode::client::Cache Cluster::createCache(
const std::unordered_map<std::string, std::string> &properties,
bool subscriptionEnabled) {
using apache::geode::client::CacheFactory;
CacheFactory cacheFactory;
for (auto &&property : properties) {
cacheFactory.set(property.first, property.second);
}
auto cache = cacheFactory.set("log-level", "none")
.set("statistic-sampling-enabled", "false")
.create();
auto poolFactory =
cache.getPoolManager().createFactory().setSubscriptionEnabled(
subscriptionEnabled);
applyLocators(poolFactory);
poolFactory.create("default");
return cache;
}
void Cluster::applyLocators(apache::geode::client::PoolFactory &poolFactory) {
for (const auto &locator : locators_) {
poolFactory.addLocator(locator.getAddress().address,
locator.getAddress().port);
}
}
Gfsh &Cluster::getGfsh() { return gfsh_; }
std::vector<Server> &Cluster::getServers() { return servers_; }
std::vector<Locator> &Cluster::getLocators() { return locators_; }
std::string &Cluster::getClasspath() { return classpath_; }
std::string &Cluster::getSecurityManager() { return securityManager_; }
std::string &Cluster::getUser() { return user_; }
std::string &Cluster::getPassword() { return password_; }
std::vector<std::string> &Cluster::getCacheXMLFiles() { return cacheXMLFiles_; }
bool Cluster::getUseIPv6() { return useIPv6_; }
void Cluster::start() { start(std::function<void()>()); }
void Cluster::start(std::function<void()> extraGfshCommands) {
locators_.reserve(initialLocators_);
for (size_t i = 0; i < initialLocators_; i++) {
locators_.push_back({*this, locators_,
name_ + "/locator/" + std::to_string(i),
jmxManagerPort_, getUseIPv6()});
}
servers_.reserve(initialServers_);
std::string xmlFile;
for (size_t i = 0; i < initialServers_; i++) {
xmlFile = (cacheXMLFiles_.size() == 0)
? ""
: cacheXMLFiles_.size() == 1 ? cacheXMLFiles_[1]
: cacheXMLFiles_[i];
servers_.push_back({*this, locators_,
name_ + "/server/" + std::to_string(i), xmlFile,
getUseIPv6()});
}
startLocators();
if (extraGfshCommands) {
extraGfshCommands();
}
startServers();
started_ = true;
}
std::string Cluster::getJmxManager() {
return locators_.begin()->getAddress().address + "[" +
std::to_string(jmxManagerPort_) + "]";
}
uint16_t Cluster::getLocatorPort() {
return locators_.begin()->getAddress().port;
}
void Cluster::startServers() {
std::vector<std::future<void>> futures;
for (auto &server : this->servers_) {
futures.push_back(std::async(std::launch::async, [&] { server.start(); }));
}
for (auto &future : futures) {
future.get();
}
}
void Cluster::startLocators() {
std::vector<std::future<void>> futures;
for (auto &locator : locators_) {
futures.push_back(std::async(std::launch::async, [&] { locator.start(); }));
}
// TODO hack until there is a way to either tell servers to retry or wait
// for single future.
for (auto &future : futures) {
future.get();
}
}
void Cluster::stop() {
std::vector<std::future<void>> futures;
for (auto &server : servers_) {
futures.push_back(std::async(std::launch::async, [&] { server.stop(); }));
}
for (auto &locator : locators_) {
futures.push_back(std::async(std::launch::async, [&] { locator.stop(); }));
}
for (auto &future : futures) {
future.wait();
}
started_ = false;
}
void Cluster::useSsl(const bool requireSslAuthentication,
const std::string keystore, const std::string truststore,
const std::string keystorePassword,
const std::string truststorePassword) {
useSsl_ = true;
requireSslAuthentication_ = requireSslAuthentication;
keystore_ = keystore;
truststore_ = truststore;
keystorePassword_ = keystorePassword;
truststorePassword_ = truststorePassword;
}
bool Cluster::useSsl() { return useSsl_; }
void Cluster::usePropertiesFile(const std::string propertiesFile) {
usePropertiesFile_ = true;
propertiesFile_ = propertiesFile;
}
void Cluster::useSecurityPropertiesFile(const std::string securityPropertiesFile) {
useSecurityPropertiesFile_ = true;
securityPropertiesFile_ = securityPropertiesFile;
}
void Cluster::useHostNameForClients(
const std::string hostName) {
usePropertiesFile_ = true;
hostName_ = hostName;
}
bool Cluster::usePropertiesFile() { return usePropertiesFile_; }
bool Cluster::useSecurityPropertiesFile() { return useSecurityPropertiesFile_; }
bool Cluster::useHostNameForClients() { return useHostNameForClients_; }
bool Cluster::requireSslAuthentication() { return requireSslAuthentication_; }
std::string Cluster::keystore() { return keystore_; }
std::string Cluster::truststore() { return truststore_; }
std::string Cluster::keystorePassword() { return keystorePassword_; }
std::string Cluster::truststorePassword() { return truststorePassword_; }