blob: 35a06e53ddb7873f03e46004a808cdfa701cfaa2 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifdef _WIN32
// Enable memory leak detection
#define _CRTDBG_MAP_ALLOC
#include <crtdbg.h>
#include <stdlib.h>
// Enable memory leak detection for new operator
#ifdef _DEBUG
#ifndef DBG_NEW
#define DBG_NEW new (_NORMAL_BLOCK, __FILE__, __LINE__)
#define new DBG_NEW
#endif
#endif
#else
#include <unistd.h>
#endif
#include "bridge.hpp"
#ifdef CASS_USE_LIBSSH2
#include <libssh2.h>
#define LIBSSH2_INIT_ALL 0
#ifndef LIBSSH2_NO_OPENSSL
#ifdef OPENSSL_CLEANUP
#define PID_UNKNOWN 0
#include <openssl/conf.h>
#include <openssl/engine.h>
#include <openssl/rand.h>
#include <openssl/ssl.h>
#endif
#endif
#endif
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <algorithm>
#include <fstream>
#include <iostream>
#include <sstream>
// Create simple console logging functions
#define LOG_MESSAGE(message, is_output) \
if (is_output) { \
std::cerr << "ccm> " << message << std::endl; \
}
#define LOG(message) LOG_MESSAGE(message, is_verbose_)
#define LOG_ERROR(message) LOG_MESSAGE(message, true)
// Create FALSE/TRUE defines for easier code readability
#ifndef FALSE
#define FALSE 0
#endif
#ifndef TRUE
#define TRUE 1
#endif
#define TRIM_DELIMETERS " \f\n\r\t\v"
#define CASSANDRA_BINARY_PORT 9042
#define CASSANDRA_STORAGE_PORT 7000
#define CASSANDRA_THRIFT_PORT 9160
#define CCM_NAP 100
#define CCM_RETRIES 100 // Up to 10 seconds for retry based on CCM_NAP
// CCM node status
#define CCM_NODE_STATUS_DECOMMISSIONED "decommissioned"
#define CCM_NODE_STATUS_DOWN "down"
#define CCM_NODE_STATUS_UNINITIALIZED "(not initialized)"
#define CCM_NODE_STATUS_UP "up"
// Workload value initialization
const std::string DSE_WORKLOADS[] = { "cassandra", "cfs", "dsefs", "graph",
"hadoop", "solr", "spark" };
const std::vector<std::string>
CCM::Bridge::dse_workloads_(DSE_WORKLOADS,
DSE_WORKLOADS + sizeof(DSE_WORKLOADS) / sizeof(DSE_WORKLOADS[0]));
const CCM::DseWorkload DEFAULT_WORKLOAD[] = { CCM::DSE_WORKLOAD_CASSANDRA };
const std::vector<CCM::DseWorkload> CCM::Bridge::DEFAULT_DSE_WORKLOAD(
DEFAULT_WORKLOAD, DEFAULT_WORKLOAD + sizeof(DEFAULT_WORKLOAD) / sizeof(DEFAULT_WORKLOAD[0]));
using namespace CCM;
CCM::Bridge::Bridge(
CassVersion server_version /*= DEFAULT_CASSANDRA_VERSION*/, bool use_git /*= DEFAULT_USE_GIT*/,
const std::string& branch_tag /* ""*/, bool use_install_dir /*=DEFAULT_USE_INSTALL_DIR*/,
const std::string& install_dir /*=""*/, ServerType server_type /*= DEFAULT_SERVER_TYPE*/,
std::vector<DseWorkload> dse_workload /*= DEFAULT_DSE_WORKLOAD*/,
const std::string& cluster_prefix /*= DEFAULT_CLUSTER_PREFIX*/,
DseCredentialsType dse_credentials_type /*= DEFAULT_DSE_CREDENTIALS*/,
const std::string& dse_username /*= ""*/, const std::string& dse_password /*= ""*/,
DeploymentType deployment_type /*= DEFAULT_DEPLOYMENT*/,
AuthenticationType authentication_type /*= DEFAULT_AUTHENTICATION*/,
const std::string& host /*= DEFAULT_HOST*/, short port /*= DEFAULT_REMOTE_DEPLOYMENT_PORT*/,
const std::string& username /*= DEFAULT_USERNAME*/,
const std::string& password /*= DEFAULT_PASSWORD*/, const std::string& public_key /*= ""*/,
const std::string& private_key /*= ""*/, bool is_verbose /*= DEFAULT_IS_VERBOSE*/)
: cassandra_version_(server_version)
, dse_version_(DEFAULT_DSE_VERSION)
, use_git_(use_git)
, branch_tag_(branch_tag)
, use_install_dir_(use_install_dir)
, install_dir_(install_dir)
, server_type_(server_type)
, dse_workload_(dse_workload)
, cluster_prefix_(cluster_prefix)
, authentication_type_(authentication_type)
, dse_credentials_type_(dse_credentials_type)
, dse_username_(dse_username)
, dse_password_(dse_password)
#ifdef CASS_USE_LIBSSH2
, deployment_type_(deployment_type)
, host_(host)
, session_(NULL)
, channel_(NULL)
, socket_(NULL)
#else
// Force local deployment only
, deployment_type_(DeploymentType::LOCAL)
, host_("127.0.0.1")
#endif
, is_verbose_(is_verbose) {
#ifdef _WIN32
#ifdef _DEBUG
// Enable automatic execution of the memory leak detection upon exit
_CrtSetDbgFlag(_CRTDBG_ALLOC_MEM_DF | _CRTDBG_LEAK_CHECK_DF);
#endif
#endif
// Determine if DSE/DDAC is being used
if (!is_cassandra()) {
dse_version_ = DseVersion(server_version.to_string());
cassandra_version_ = dse_version_.get_cass_version();
}
// Determine if installation directory can be used
if (use_install_dir_ && install_dir_.empty()) {
throw BridgeException("Directory must not be blank");
}
#ifdef CASS_USE_LIBSSH2
// Determine if libssh2 needs to be initialized
if (deployment_type_ == DeploymentType::REMOTE) {
// Initialize the socket
try {
initialize_socket(host_, port);
} catch (SocketException& se) {
// Re-throw the exception as a BridgeException
finalize_libssh2();
throw BridgeException(se.what());
}
// Initialize the libssh2 connection
initialize_libssh2();
// Authenticate and establish the libssh2 connection
establish_libssh2_connection(authentication_type_, username, password, public_key, private_key);
}
#endif
}
CCM::Bridge::~Bridge() {
#ifdef CASS_USE_LIBSSH2
if (deployment_type_ == DeploymentType::REMOTE) {
close_libssh2_terminal();
finalize_libssh2();
}
#endif
}
void CCM::Bridge::clear_cluster_data() {
// Create the cluster clear data command and execute
std::vector<std::string> clear_cluster_data_command;
clear_cluster_data_command.push_back("clear");
execute_ccm_command(clear_cluster_data_command);
}
std::string CCM::Bridge::cluster_contact_points(bool is_all /*= true*/) {
// Determine if all the nodes are being returned or just the available nodes
if (!is_all) {
// Create the cluster liveset command and execute
std::vector<std::string> liveset_command;
liveset_command.push_back("liveset");
return execute_ccm_command(liveset_command);
} else {
ClusterStatus status = cluster_status();
std::stringstream contact_points;
std::string ip_prefix = get_ip_prefix();
for (unsigned i = 1; i <= status.node_count; ++i) {
if (i > 1) {
contact_points << ",";
}
contact_points << ip_prefix << i;
}
return contact_points.str();
}
}
std::vector<std::string> CCM::Bridge::cluster_ip_addresses(bool is_all /*= true*/) {
// Get and sort the IPv4 addresses and return the array/vector
std::vector<std::string> ip_addresses = explode(cluster_contact_points(is_all), ',');
std::sort(ip_addresses.begin(), ip_addresses.end());
return ip_addresses;
}
ClusterStatus CCM::Bridge::cluster_status() {
// Create the cluster status command and execute
std::vector<std::string> status_command;
status_command.push_back("status");
std::string ccm_output = execute_ccm_command(status_command);
// Iterate over the output line and parse the tokens
std::string ip_prefix = get_ip_prefix();
ClusterStatus status;
std::istringstream parser(ccm_output);
for (std::string token; std::getline(parser, token);) {
if (!token.empty()) {
// Handle node lines only
std::string current_line = to_lower(trim(token));
if (current_line.compare(0, 4, "node") == 0) {
// Remove node and colon
current_line.replace(0, 4, "");
size_t colon_index = current_line.find(":");
if (colon_index != std::string::npos) {
current_line.replace(colon_index, 1, "");
}
// Split into key and value (node and status) and update cluster status
std::vector<std::string> tokens = explode(current_line);
if (tokens.size() >= 2) {
std::string node_ip_address = ip_prefix + tokens[0];
std::string node_status(tokens[1]);
++status.node_count;
// Determine the status being updated
if (node_status.compare(CCM_NODE_STATUS_DECOMMISSIONED) == 0 ||
node_status.compare("decommisionned") ==
0) { // Handle misspelling of decommissioned for older CCM versions
status.nodes_decommissioned.push_back(node_ip_address);
} else if (node_status.compare(CCM_NODE_STATUS_DOWN) == 0) {
// Determine if the node is actually uninitialized
if (tokens.size() == 4 &&
(tokens[2] + " " + tokens[3]).compare(CCM_NODE_STATUS_UNINITIALIZED) == 0) {
status.nodes_uninitialized.push_back(node_ip_address);
} else {
status.nodes_down.push_back(node_ip_address);
}
} else if (node_status.compare(CCM_NODE_STATUS_UP) == 0) {
status.nodes_up.push_back(node_ip_address);
} else {
LOG_ERROR("Node status \"" << node_status << "\" is not valid");
}
} else {
LOG_ERROR("To many tokens detected in \"" << current_line
<< "\" to determine node status");
}
}
}
}
return status;
}
bool CCM::Bridge::create_cluster(std::vector<unsigned short> data_center_nodes,
bool with_vnodes /*= false*/,
bool is_password_authenticator /*= false*/,
bool is_ssl /*= false*/,
bool is_client_authentication /*= false*/) {
// Generate the cluster name and determine if it needs to be created
std::string active_cluster_name = get_active_cluster();
std::string cluster_name =
generate_cluster_name(cassandra_version_, data_center_nodes, with_vnodes,
is_password_authenticator, is_ssl, is_client_authentication);
for (std::vector<DseWorkload>::iterator iterator = dse_workload_.begin();
iterator != dse_workload_.end(); ++iterator) {
if (is_dse() && *iterator != DSE_WORKLOAD_CASSANDRA) {
cluster_name.append("-").append(dse_workloads_[*iterator]);
}
}
if (!switch_cluster(cluster_name)) {
// Ensure any active cluster is stopped
if (!get_active_cluster().empty()) {
stop_cluster();
}
// Create the cluster create command and execute
std::vector<std::string> create_command;
create_command.push_back("create");
if (use_install_dir_ && !install_dir_.empty()) {
create_command.push_back("--install-dir=" + install_dir_);
} else {
create_command.push_back("-v");
if (is_cassandra()) {
if (use_git_) {
if (branch_tag_.empty()) {
create_command.push_back("git:cassandra-" + cassandra_version_.to_string());
} else {
create_command.push_back("git:" + branch_tag_);
}
} else {
create_command.push_back(cassandra_version_.ccm_version());
}
} else {
if (use_git_) {
if (branch_tag_.empty()) {
create_command.push_back("git:" + dse_version_.to_string());
} else {
create_command.push_back("git:" + branch_tag_);
}
} else {
create_command.push_back(dse_version_.ccm_version());
}
if (dse_credentials_type_ == DseCredentialsType::USERNAME_PASSWORD) {
create_command.push_back("--dse-username=" + dse_username_);
create_command.push_back("--dse-password=" + dse_password_);
}
}
}
if (is_dse()) {
create_command.push_back("--dse");
} else if (is_ddac()) {
create_command.push_back("--ddac");
}
create_command.push_back("-b");
// Determine if password authenticator or SSL and client authentication
// should be enabled
if (is_password_authenticator) {
create_command.push_back("--pwd-auth");
}
if (is_ssl) {
// TODO: Use test::Utils::temp_directory() after boost tests are removed and bridge is merged
// into testing framework
#ifdef _WIN32
char* temp = getenv("TEMP");
std::string ssl_command = "--ssl=";
ssl_command.append(temp);
ssl_command.append("\\");
ssl_command.append("ssl");
#else
std::string ssl_command = "--ssl=/tmp/ssl";
#endif
create_command.push_back(ssl_command);
if (is_client_authentication) {
create_command.push_back("--require_client_auth");
}
}
// Add the name of the cluster to create
create_command.push_back(cluster_name);
// Execute the CCM create command
execute_ccm_command(create_command);
// Generate the cluster update configuration command and execute
execute_ccm_command(generate_create_updateconf_command(cassandra_version_));
if (is_dse() && dse_version_ >= "6.7.0") {
update_cluster_configuration("user_defined_function_fail_micros", "5000000");
}
// Create the cluster populate command and execute
std::string cluster_nodes = generate_cluster_nodes(data_center_nodes);
std::string cluster_ip_prefix = get_ip_prefix();
std::vector<std::string> populate_command;
populate_command.push_back("populate");
populate_command.push_back("-n");
populate_command.push_back(cluster_nodes);
populate_command.push_back("-i");
populate_command.push_back(cluster_ip_prefix);
if (with_vnodes) {
populate_command.push_back("--vnodes");
}
execute_ccm_command(populate_command);
// Update the cluster configuration (set num_tokens)
if (with_vnodes) {
// Maximum number of tokens is 1536
update_cluster_configuration("num_tokens", "1536");
}
// Set the DSE workload (if applicable)
if (is_dse() && !(dse_workload_.size() == 1 && dse_workload_[0] == DSE_WORKLOAD_CASSANDRA)) {
set_dse_workloads(dse_workload_);
}
}
// Indicate if the cluster was created or switched
return !(active_cluster_name.compare(cluster_name) == 0);
}
bool CCM::Bridge::is_cluster_down() {
// Iterate over each node and ensure a connection cannot be made
ClusterStatus status = cluster_status();
for (unsigned int i = 1; i <= status.node_count; ++i) {
// Ensure the node is down
if (!is_node_down(i)) {
return false;
}
}
// Cluster is down
return true;
}
bool CCM::Bridge::is_cluster_up() {
// Iterate over each node and ensure a connection can be made
ClusterStatus status = cluster_status();
for (unsigned int i = 1; i <= status.node_count; ++i) {
// Ensure the node is ready/up
if (!is_node_up(i)) {
return false;
}
}
// Cluster is ready
return true;
}
bool CCM::Bridge::hang_up_cluster() {
// Create the cluster stop command and execute
std::vector<std::string> stop_command;
stop_command.push_back("stop");
stop_command.push_back("--hang-up");
execute_ccm_command(stop_command);
// Ensure the cluster is down
return is_cluster_down();
}
bool CCM::Bridge::kill_cluster() { return stop_cluster(true); }
void CCM::Bridge::remove_cluster() { remove_cluster(get_active_cluster()); }
void CCM::Bridge::remove_cluster(const std::string& cluster_name) {
// Create the cluster remove command and execute
std::vector<std::string> remove_command;
remove_command.push_back("remove");
remove_command.push_back(cluster_name);
execute_ccm_command(remove_command);
}
void CCM::Bridge::remove_all_clusters(bool is_all /*= false*/) {
// Iterate through all the available clusters
std::vector<std::string> clusters = get_available_clusters();
for (std::vector<std::string>::const_iterator iterator = clusters.begin();
iterator != clusters.end(); ++iterator) {
// Determine if the cluster should be removed
bool is_valid_cluster = is_all;
if (!is_valid_cluster && (*iterator).compare(0, cluster_prefix_.size(), cluster_prefix_) != 0) {
continue;
}
remove_cluster(*iterator);
}
}
bool CCM::Bridge::start_cluster(
std::vector<std::string> jvm_arguments /*= DEFAULT_JVM_ARGUMENTS*/) {
// Create the cluster start command and execute
std::vector<std::string> start_command;
start_command.push_back("start");
start_command.push_back("--wait-other-notice");
start_command.push_back("--wait-for-binary-proto");
#ifdef _WIN32
if (deployment_type_ == DeploymentType::LOCAL) {
if (cassandra_version_ >= "2.2.4") {
start_command.push_back("--quiet-windows");
}
}
#endif
for (std::vector<std::string>::const_iterator iterator = jvm_arguments.begin();
iterator != jvm_arguments.end(); ++iterator) {
std::string jvm_argument = trim(*iterator);
if (!jvm_argument.empty()) {
start_command.push_back("--jvm_arg=" + *iterator);
}
}
execute_ccm_command(start_command);
// Ensure the cluster is up
return is_cluster_up();
}
bool CCM::Bridge::start_cluster(std::string jvm_argument /*= ""*/) {
std::vector<std::string> jvm_arguments;
if (!jvm_argument.empty()) {
jvm_arguments.push_back(jvm_argument);
}
return start_cluster(jvm_arguments);
}
bool CCM::Bridge::stop_cluster(bool is_kill /*= false*/) {
// Create the cluster stop command and execute
std::vector<std::string> stop_command;
stop_command.push_back("stop");
if (is_kill) {
stop_command.push_back("--not-gently");
}
execute_ccm_command(stop_command);
// Ensure the cluster is down
return is_cluster_down();
}
bool CCM::Bridge::switch_cluster(const std::string& cluster_name) {
// Get the active cluster and the available clusters
std::string active_cluster;
std::vector<std::string> clusters = get_available_clusters(active_cluster);
// Determine if the switch should be performed
if (active_cluster.compare(trim(cluster_name)) != 0) {
// Ensure the cluster is in the list
if (std::find(clusters.begin(), clusters.end(), cluster_name) != clusters.end()) {
if (!active_cluster.empty()) {
kill_cluster();
}
// Create the cluster switch command and clear the data
std::vector<std::string> switch_command;
switch_command.push_back("switch");
switch_command.push_back(cluster_name);
execute_ccm_command(switch_command);
clear_cluster_data();
return true;
}
} else {
// Cluster requested is already active
return true;
}
// Unable to switch the cluster
return false;
}
void CCM::Bridge::update_cluster_configuration(std::vector<std::string> key_value_pairs,
bool is_dse /*= false*/, bool is_yaml /*= false*/) {
// Create the update configuration command
if (is_yaml) {
for (std::vector<std::string>::const_iterator iterator = key_value_pairs.begin();
iterator != key_value_pairs.end(); ++iterator) {
update_cluster_configuration_yaml(*iterator, is_dse);
}
} else {
key_value_pairs.insert(key_value_pairs.begin(), (is_dse ? "updatedseconf" : "updateconf"));
execute_ccm_command(key_value_pairs);
}
}
void CCM::Bridge::update_cluster_configuration(const std::string& key, const std::string& value,
bool is_dse /*= false*/) {
// Create the configuration to be updated
std::stringstream configuration;
configuration << key << ":" << value;
// Create the update configuration command
std::vector<std::string> updateconf_command;
updateconf_command.push_back(is_dse ? "updatedseconf" : "updateconf");
updateconf_command.push_back(configuration.str());
execute_ccm_command(updateconf_command);
}
void CCM::Bridge::update_cluster_configuration_yaml(const std::string& yaml,
bool is_dse /*= false*/) {
// Create the update configuration command for a literal YAML
std::vector<std::string> updateconf_command;
updateconf_command.push_back(is_dse ? "updatedseconf" : "updateconf");
updateconf_command.push_back("-y");
updateconf_command.push_back(yaml);
execute_ccm_command(updateconf_command);
}
void CCM::Bridge::update_node_configuration(unsigned int node,
std::vector<std::string> key_value_pairs) {
// Create the update configuration command
key_value_pairs.insert(key_value_pairs.begin(), generate_node_name(node));
key_value_pairs.insert(key_value_pairs.begin(), "updateconf");
execute_ccm_command(key_value_pairs);
}
void CCM::Bridge::update_node_configuration(unsigned int node, const std::string& key,
const std::string& value) {
// Create the configuration to be updated
std::stringstream configuration;
configuration << key << ":" << value;
// Create the update configuration command
std::vector<std::string> updateconf_command;
updateconf_command.push_back(generate_node_name(node));
updateconf_command.push_back("updateconf");
updateconf_command.push_back(configuration.str());
execute_ccm_command(updateconf_command);
}
unsigned int CCM::Bridge::add_node(const std::string& data_center /*= ""*/) {
// Generate the arguments for the add node command
unsigned int node = get_next_available_node();
std::stringstream node_ip_address;
node_ip_address << get_ip_prefix() << node;
std::stringstream jmx_port;
std::stringstream jmx_remote_debug_port;
jmx_port << (7000 + (100 * node));
jmx_remote_debug_port << (2000 + (100 * node));
// Create the add node command and execute
std::vector<std::string> add_node_command;
add_node_command.push_back("add");
add_node_command.push_back("-b");
add_node_command.push_back("-i");
add_node_command.push_back(node_ip_address.str());
add_node_command.push_back("-j");
add_node_command.push_back(jmx_port.str());
add_node_command.push_back("-r");
add_node_command.push_back(jmx_remote_debug_port.str());
if (!data_center.empty()) {
add_node_command.push_back("-d");
add_node_command.push_back(data_center);
}
if (is_dse()) {
add_node_command.push_back("--dse");
}
add_node_command.push_back(generate_node_name(node));
execute_ccm_command(add_node_command);
// Return the node created
return node;
}
unsigned int CCM::Bridge::bootstrap_node(const std::vector<std::string>& jvm_arguments,
const std::string& data_center /*= ""*/) {
unsigned int node = add_node(data_center);
start_node(node, jvm_arguments);
return node;
}
unsigned int CCM::Bridge::bootstrap_node(const std::string& jvm_argument /*= ""*/,
const std::string& data_center /*= ""*/) {
unsigned int node = add_node(data_center);
start_node(node, jvm_argument);
return node;
}
bool CCM::Bridge::decommission_node(unsigned int node, bool is_force /*= false*/) {
// Create the node decommission command and execute
std::vector<std::string> decommission_node_command;
decommission_node_command.push_back(generate_node_name(node));
decommission_node_command.push_back("decommission");
if (is_force && ((is_cassandra() && cassandra_version_ >= "3.12") || // Cassandra v3.12+
(!is_cassandra() && dse_version_ >= "5.1.0"))) { // DataStax Enterprise v5.1.0+
decommission_node_command.push_back("--force");
}
execute_ccm_command(decommission_node_command);
// Ensure the node has been decommissioned
return is_node_decommissioned(node);
}
void CCM::Bridge::disable_node_binary_protocol(unsigned int node) {
// Create the disable node binary protocol command and execute
std::vector<std::string> disable_node_binary_protocol_command;
disable_node_binary_protocol_command.push_back(generate_node_name(node));
disable_node_binary_protocol_command.push_back("nodetool");
disable_node_binary_protocol_command.push_back("disablebinary");
execute_ccm_command(disable_node_binary_protocol_command);
}
void CCM::Bridge::disable_node_gossip(unsigned int node) {
// Create the disable node gossip command and execute
std::vector<std::string> disable_node_gossip_command;
disable_node_gossip_command.push_back(generate_node_name(node));
disable_node_gossip_command.push_back("nodetool");
disable_node_gossip_command.push_back("disablegossip");
execute_ccm_command(disable_node_gossip_command);
}
void CCM::Bridge::disable_node_trace(unsigned int node) {
// Create the disable node trace command and execute
std::vector<std::string> disable_node_trace_command;
disable_node_trace_command.push_back(generate_node_name(node));
disable_node_trace_command.push_back("nodetool");
disable_node_trace_command.push_back("settraceprobability");
disable_node_trace_command.push_back("0");
execute_ccm_command(disable_node_trace_command);
}
void CCM::Bridge::enable_node_binary_protocol(unsigned int node) {
// Create the enable node binary protocol command and execute
std::vector<std::string> enable_node_binary_protocol_command;
enable_node_binary_protocol_command.push_back(generate_node_name(node));
enable_node_binary_protocol_command.push_back("nodetool");
enable_node_binary_protocol_command.push_back("enablebinary");
execute_ccm_command(enable_node_binary_protocol_command);
}
void CCM::Bridge::enable_node_gossip(unsigned int node) {
// Create the enable node gossip command and execute
std::vector<std::string> disable_node_gossip_command;
disable_node_gossip_command.push_back(generate_node_name(node));
disable_node_gossip_command.push_back("nodetool");
disable_node_gossip_command.push_back("enablegossip");
execute_ccm_command(disable_node_gossip_command);
}
void CCM::Bridge::enable_node_trace(unsigned int node) {
// Create the enable node trace command and execute
std::vector<std::string> enable_node_trace_command;
enable_node_trace_command.push_back(generate_node_name(node));
enable_node_trace_command.push_back("nodetool");
enable_node_trace_command.push_back("settraceprobability");
enable_node_trace_command.push_back("1");
execute_ccm_command(enable_node_trace_command);
}
void CCM::Bridge::execute_cql_on_node(unsigned int node, const std::string& cql) {
// Update the CQL statement for the command line
std::stringstream execute_statement;
execute_statement << "\"" << cql << ";\"";
// Create the CQLSH pass through command and execute
std::vector<std::string> cqlsh_node_command;
cqlsh_node_command.push_back(generate_node_name(node));
cqlsh_node_command.push_back("cqlsh");
cqlsh_node_command.push_back("-x");
cqlsh_node_command.push_back(execute_statement.str());
execute_ccm_command(cqlsh_node_command);
}
bool CCM::Bridge::force_decommission_node(unsigned int node) {
return decommission_node(node, true);
}
bool CCM::Bridge::hang_up_node(unsigned int node) {
// Create the node stop command and execute
std::vector<std::string> stop_node_command;
stop_node_command.push_back(generate_node_name(node));
stop_node_command.push_back("stop");
stop_node_command.push_back("--hang-up");
execute_ccm_command(stop_node_command);
// Ensure the node is down
return is_node_down(node);
}
bool CCM::Bridge::kill_node(unsigned int node) { return stop_node(node, true); }
void CCM::Bridge::pause_node(unsigned int node) {
// Create the node pause command and execute
std::vector<std::string> pause_node_command;
pause_node_command.push_back(generate_node_name(node));
pause_node_command.push_back("pause");
execute_ccm_command(pause_node_command);
}
void CCM::Bridge::resume_node(unsigned int node) {
// Create the node resume command and execute
std::vector<std::string> resume_node_command;
resume_node_command.push_back(generate_node_name(node));
resume_node_command.push_back("resume");
execute_ccm_command(resume_node_command);
}
bool CCM::Bridge::start_node(
unsigned int node, const std::vector<std::string>& jvm_arguments /*= DEFAULT_JVM_ARGUMENTS*/) {
// Create the node start command and execute
std::vector<std::string> start_node_command;
start_node_command.push_back(generate_node_name(node));
start_node_command.push_back("start");
start_node_command.push_back("--wait-other-notice");
start_node_command.push_back("--wait-for-binary-proto");
#ifdef _WIN32
if (deployment_type_ == DeploymentType::LOCAL) {
if (cassandra_version_ >= "2.2.4") {
start_node_command.push_back("--quiet-windows");
}
}
#endif
for (std::vector<std::string>::const_iterator iterator = jvm_arguments.begin();
iterator != jvm_arguments.end(); ++iterator) {
std::string jvm_argument = trim(*iterator);
if (!jvm_argument.empty()) {
start_node_command.push_back("--jvm_arg=" + *iterator);
}
}
execute_ccm_command(start_node_command);
// Ensure the node is up
return is_node_up(node);
}
bool CCM::Bridge::start_node(unsigned int node, const std::string& jvm_argument) {
// Create the JVM arguments array/vector
std::vector<std::string> jvm_arguments;
jvm_arguments.push_back(jvm_argument);
return start_node(node, jvm_arguments);
}
bool CCM::Bridge::stop_node(unsigned int node, bool is_kill /*= false*/) {
// Create the node stop command and execute
std::vector<std::string> stop_node_command;
stop_node_command.push_back(generate_node_name(node));
stop_node_command.push_back("stop");
if (is_kill) {
stop_node_command.push_back("--not-gently");
}
execute_ccm_command(stop_node_command);
// Ensure the node is down
return is_node_down(node);
}
std::string CCM::Bridge::get_ip_prefix() { return host_.substr(0, host_.size() - 1); }
CassVersion CCM::Bridge::get_cassandra_version() {
// Get the version string from CCM
std::vector<std::string> active_cluster_version_command;
active_cluster_version_command.push_back(generate_node_name(1));
active_cluster_version_command.push_back("version");
std::string ccm_output = execute_ccm_command(active_cluster_version_command);
// Ensure the version release information exists and return the version
size_t version_index = ccm_output.find("ReleaseVersion:");
if (version_index != std::string::npos) {
ccm_output.replace(0, version_index + 15, "");
return CassVersion(trim(ccm_output));
}
// Unable to determine version information from active cluster
throw BridgeException("Unable to determine version information from active Cassandra cluster \"" +
get_active_cluster() + "\"");
}
DseVersion CCM::Bridge::get_dse_version() {
// Get the version string from CCM
std::vector<std::string> active_cluster_version_command;
active_cluster_version_command.push_back(generate_node_name(1));
active_cluster_version_command.push_back("dse");
active_cluster_version_command.push_back("-v");
std::string ccm_output = execute_ccm_command(active_cluster_version_command);
// Ensure the version release information exists and return the version
ccm_output = trim(ccm_output);
if (!ccm_output.empty()) {
return DseVersion(ccm_output);
}
// Unable to determine version information from active cluster
throw BridgeException("Unable to determine version information from active DSE/DDAC cluster \"" +
get_active_cluster() + "\"");
}
bool CCM::Bridge::set_dse_workload(unsigned int node, DseWorkload workload,
bool is_kill /*= false */) {
std::vector<DseWorkload> workloads;
workloads.push_back(workload);
return set_dse_workloads(1, workloads, is_kill);
}
bool CCM::Bridge::set_dse_workloads(unsigned int node, std::vector<DseWorkload> workloads,
bool is_kill /*= false */) {
// Ensure the workloads can be processed
if (workloads.empty()) {
throw BridgeException("No workloads to assign");
}
// Update the member variable with the workloads and generate workloads
dse_workload_.clear();
dse_workload_ = workloads;
std::string dse_workloads = generate_dse_workloads(workloads);
// Determine if the node is currently active/up
bool was_node_active = false;
if (!is_node_down(node)) {
LOG("Stopping active node \"" << node << "\" and assigning workload(s) \"" << dse_workloads
<< "\"");
stop_node(node, is_kill);
was_node_active = true;
}
// Create the node DSE workload command and execute
std::vector<std::string> dse_workload_command;
dse_workload_command.push_back(generate_node_name(node));
dse_workload_command.push_back("setworkload");
dse_workload_command.push_back(dse_workloads);
execute_ccm_command(dse_workload_command);
// Determine if the node should be restarted
if (was_node_active) {
LOG("Restarting node \"" << node << "\" to applying workload(s) \"" << dse_workloads << "\"");
start_node(node);
}
return was_node_active;
}
bool CCM::Bridge::set_dse_workload(DseWorkload workload, bool is_kill /*= false */) {
std::vector<DseWorkload> workloads;
workloads.push_back(workload);
return set_dse_workloads(workloads, is_kill);
}
bool CCM::Bridge::set_dse_workloads(std::vector<DseWorkload> workloads, bool is_kill /*= false */) {
// Ensure the workloads can be processed
if (workloads.empty()) {
throw BridgeException("No workloads to assign");
}
// Determine if the cluster is currently active/up
bool was_cluster_active = false;
std::string cluster = get_active_cluster();
if (!is_cluster_down()) {
LOG("Stopping active cluster \"" << cluster << "\" and assigning workload(s) \""
<< generate_dse_workloads(workloads) << "\"");
stop_cluster(is_kill);
was_cluster_active = true;
}
// Iterate over each node and set the DSE workload
ClusterStatus status = cluster_status();
for (unsigned int i = 1; i <= status.node_count; ++i) {
set_dse_workloads(i, workloads, false);
}
// Determine if the cluster should be restarted
if (was_cluster_active) {
LOG("Restarting cluster \"" << cluster << "\" and applying workload(s) \""
<< generate_dse_workloads(workloads) << "\"");
start_cluster();
}
return was_cluster_active;
}
bool CCM::Bridge::is_node_decommissioned(unsigned int node) {
// Iterate over the list of decommissioned nodes
std::stringstream node_ip_address;
node_ip_address << get_ip_prefix() << node;
std::vector<std::string> nodes = cluster_status().nodes_decommissioned;
for (std::vector<std::string>::const_iterator iterator = nodes.begin(); iterator < nodes.end();
++iterator) {
if (node_ip_address.str().compare(*iterator) == 0) {
return true;
}
}
// Node has not been decommissioned
return false;
}
bool CCM::Bridge::is_node_down(unsigned int node, bool is_quick_check /*= false*/) {
if (is_quick_check) {
return !is_node_availabe(node);
}
unsigned int number_of_retries = 0;
while (number_of_retries++ < CCM_RETRIES) {
if (!is_node_availabe(node)) {
return true;
} else {
std::string cluster = get_active_cluster();
LOG("[#" << number_of_retries
<< "] - Attempting to recheck node down "
"status for node \""
<< node << "\" in cluster \"" << cluster << "\"");
msleep(CCM_NAP);
}
}
// Connection can still being established on node
return false;
}
bool CCM::Bridge::is_node_up(unsigned int node, bool is_quick_check /*= false*/) {
if (is_quick_check) {
return is_node_availabe(node);
}
unsigned int number_of_retries = 0;
while (number_of_retries++ < CCM_RETRIES) {
if (is_node_availabe(node)) {
return true;
} else {
std::string cluster = get_active_cluster();
LOG("[#" << number_of_retries
<< "] - Attempting to recheck node up "
"status for node \""
<< node << "\" in cluster \"" << cluster << "\"");
msleep(CCM_NAP);
}
}
// Connection cannot be established on node
return false;
}
#ifdef CASS_USE_LIBSSH2
void CCM::Bridge::initialize_socket(const std::string& host, short port) {
// Initialize the socket
socket_ = new Socket();
// Establish socket connection
socket_->establish_connection(host, port);
}
void CCM::Bridge::synchronize_socket() {
// Determine current read/write direction of the session
bool is_read = false;
bool is_write = false;
int read_write_direction = libssh2_session_block_directions(session_);
if (read_write_direction & LIBSSH2_SESSION_BLOCK_INBOUND) {
is_read = true;
}
if (read_write_direction & LIBSSH2_SESSION_BLOCK_OUTBOUND) {
is_write = true;
}
// Synchronize the socket
socket_->synchronize(is_read, is_write);
}
void CCM::Bridge::initialize_libssh2() {
// Initialize libssh2
int rc = libssh2_init(LIBSSH2_INIT_ALL);
if (rc) {
finalize_libssh2();
std::stringstream message;
message << "[libssh2] Failed initialization with error code \"" << rc << "\"";
throw BridgeException(message.str());
}
// Initialize and create the libssh2 session
session_ = libssh2_session_init();
if (!session_) {
finalize_libssh2();
throw BridgeException("[libssh2] Failed session failed");
}
// Disable blocking on the session
libssh2_session_set_blocking(session_, FALSE);
// Perform the session handshake; trade banners, exchange keys, setup cyrpto
while ((rc = libssh2_session_handshake(session_, socket_->get_handle())) ==
LIBSSH2_ERROR_EAGAIN) {
; // no-op
}
if (rc) {
// Determine error that occurred
std::stringstream message;
message << "[libssh2] Failed session handshake with error ";
switch (rc) {
case LIBSSH2_ERROR_SOCKET_NONE:
message << "\"the socket is invalid\"";
break;
case LIBSSH2_ERROR_BANNER_SEND:
message << "\"unable to send banner to remote host\"";
break;
case LIBSSH2_ERROR_KEX_FAILURE:
message << "\"encryption key exchange with the remote host failed\"";
break;
case LIBSSH2_ERROR_SOCKET_SEND:
message << "\"unable to send data on socket\"";
break;
case LIBSSH2_ERROR_SOCKET_DISCONNECT:
message << "\"the socket was disconnected\"";
break;
case LIBSSH2_ERROR_PROTO:
message << "\"an invalid SSH protocol response was received on the socket\"";
break;
default:
message << " code \"" << rc << "\"";
break;
}
finalize_libssh2();
throw BridgeException(message.str());
}
}
void CCM::Bridge::establish_libssh2_connection(AuthenticationType authentication_type,
const std::string& username,
const std::string& password,
const std::string& public_key,
const std::string& private_key) {
int rc = 0;
// Determine authentication mechanism
if (authentication_type == AuthenticationType::USERNAME_PASSWORD) {
// Perform username and password authentication
while ((rc = libssh2_userauth_password(session_, username.c_str(), password.c_str())) ==
LIBSSH2_ERROR_EAGAIN) {
; // no-op
}
} else {
while ((rc = libssh2_userauth_publickey_fromfile(session_, username.c_str(), public_key.c_str(),
private_key.c_str(), "")) ==
LIBSSH2_ERROR_EAGAIN) {
; // no-op
}
}
if (rc) {
// Determine error that occurred
std::stringstream message;
message << "[libssh2] Failed username/password authentication with error ";
switch (rc) {
case LIBSSH2_ERROR_ALLOC:
message << "\"an internal memory allocation call failed\"";
break;
case LIBSSH2_ERROR_SOCKET_SEND:
message << "\"unable to send data on socket\"";
break;
case LIBSSH2_ERROR_SOCKET_TIMEOUT:
message << "\"timed out waiting for response\"";
break;
case LIBSSH2_ERROR_PASSWORD_EXPIRED:
message << "\"password has expired\"";
break;
case LIBSSH2_ERROR_PUBLICKEY_UNVERIFIED:
message << "\"the username/public key combination was invalid\"";
break;
case LIBSSH2_ERROR_AUTHENTICATION_FAILED:
// Ensure error message is displayed for the authentication type
if (authentication_type == AuthenticationType::USERNAME_PASSWORD) {
message << "\"invalid username/password\"";
} else {
message << "\"authentication using the supplied public key was not accepted\"";
}
break;
default:
message << "code \"" << rc << "\"";
break;
}
finalize_libssh2();
throw BridgeException(message.str());
}
}
void CCM::Bridge::open_libssh2_terminal() {
// Open a channel; request a shell
while (session_ != NULL && (channel_ = libssh2_channel_open_session(session_)) == NULL &&
libssh2_session_last_error(session_, NULL, NULL, FALSE) == LIBSSH2_ERROR_EAGAIN) {
synchronize_socket();
}
if (!channel_) {
// Determine error that occurred
int rc = libssh2_session_last_error(session_, NULL, NULL, FALSE);
std::string message("[libssh2] Failed opening session channel with error \"");
switch (rc) {
case LIBSSH2_ERROR_ALLOC:
message.append("an internal memory allocation call failed");
break;
case LIBSSH2_ERROR_SOCKET_SEND:
message.append("unable to send data on socket");
break;
case LIBSSH2_ERROR_CHANNEL_FAILURE:
message.append("unable to open channel");
}
message.append("\"");
finalize_libssh2();
throw BridgeException(message);
}
}
void CCM::Bridge::close_libssh2_terminal() {
if (channel_) {
// Close the libssh2 channel/terminal
int rc = 0;
while ((rc = libssh2_channel_close(channel_)) == LIBSSH2_ERROR_EAGAIN) {
synchronize_socket();
}
if (rc == 0) {
char* exit_signal = NULL;
libssh2_channel_get_exit_status(channel_);
libssh2_channel_get_exit_signal(channel_, &exit_signal, NULL, NULL, NULL, NULL, NULL);
if (exit_signal) {
LOG_ERROR("[libssh2] Failed to close channel with exit signal \"" << exit_signal << "\"");
}
}
if (rc) {
LOG_ERROR("[libssh2] Failed to close channel with error code \"" << rc << "\"");
}
// Free the channel/terminal resources
while ((rc = libssh2_channel_free(channel_)) == LIBSSH2_ERROR_EAGAIN) {
; // no-op
}
if (rc) {
LOG_ERROR("[libssh2] Failed to free channel resources with error code \"" << rc << "\"");
}
channel_ = NULL;
}
}
void CCM::Bridge::finalize_libssh2() {
// Free the libssh2 session
if (session_) {
// Perform session disconnection
int rc = 0;
while ((rc = libssh2_session_disconnect(
session_, "Shutting down libssh2 CCM bridge session")) == LIBSSH2_ERROR_EAGAIN) {
; // no-op
}
if (rc) {
LOG_ERROR("[libssh2] Failed to disconnect session with error code \"" << rc << "\"");
}
while ((rc = libssh2_session_free(session_)) == LIBSSH2_ERROR_EAGAIN) {
; // no-op
}
if (rc) {
LOG_ERROR("[libssh2] Failed to free session resources with error code \"" << rc << "\"");
}
session_ = NULL;
}
// Free the socket (closes connection)
delete socket_;
socket_ = NULL;
// Free up remaining libssh2 memory
libssh2_exit();
#ifndef LIBSSH2_NO_OPENSSL
#ifdef OPENSSL_CLEANUP
// Free OpenSSL resources
RAND_cleanup();
ENGINE_cleanup();
CONF_modules_unload(TRUE);
CONF_modules_free();
EVP_cleanup();
ERR_free_strings();
ERR_remove_state(PID_UNKNOWN);
CRYPTO_cleanup_all_ex_data();
#endif
#endif
}
std::string CCM::Bridge::execute_libssh2_command(const std::vector<std::string>& command) {
// Make sure the libssh2 session wasn't terminated
if (!session_) {
throw BridgeException("[libssh2] Session is invalid/terminated");
}
// Create/Open libssh2 terminal
open_libssh2_terminal();
// Execute the command
int rc = 0;
std::string full_command = implode(command);
while ((rc = libssh2_channel_exec(channel_, full_command.c_str())) == LIBSSH2_ERROR_EAGAIN) {
synchronize_socket();
}
if (rc) {
// Determine error that occurred
std::stringstream message;
message << "[libssh2] Failed to execute command with error ";
switch (rc) {
case LIBSSH2_ERROR_ALLOC:
message << "\"An internal memory allocation call failed\"";
break;
case LIBSSH2_ERROR_SOCKET_SEND:
message << "\"Unable to send data on socket\"";
break;
case LIBSSH2_ERROR_CHANNEL_REQUEST_DENIED:
message << "\"Request denied\"";
break;
default:
message << "code \"" << rc << "\"";
break;
}
finalize_libssh2();
throw BridgeException(message.str());
}
// Get the terminal output, close the terminal and return the output
std::string output = read_libssh2_terminal();
close_libssh2_terminal();
return output;
}
#endif
#ifdef CASS_USE_LIBSSH2
std::string CCM::Bridge::read_libssh2_terminal() {
ssize_t nread = 0;
char buffer[512];
memset(buffer, '\0', sizeof(char) * 512);
std::string output;
// Read stdout
while (true) {
while ((nread = libssh2_channel_read(channel_, buffer, sizeof(buffer))) > 0) {
if (nread > 0) {
output.append(buffer, nread);
}
}
if (nread == LIBSSH2_ERROR_EAGAIN) {
synchronize_socket();
msleep(CCM_NAP);
} else {
break;
}
}
// Read stderr
while (true) {
while ((nread = libssh2_channel_read_stderr(channel_, buffer, sizeof(buffer))) > 0) {
if (nread > 0) {
output.append(buffer, nread);
}
}
if (nread == LIBSSH2_ERROR_EAGAIN) {
synchronize_socket();
msleep(CCM_NAP);
} else {
break;
}
}
return output;
}
#endif
std::string CCM::Bridge::execute_ccm_command(const std::vector<std::string>& command) {
// Create the CCM command
std::vector<std::string> ccm_command;
ccm_command.push_back("ccm");
ccm_command.insert(ccm_command.end(), command.begin(), command.end());
LOG(implode(ccm_command));
// Determine how to execute the command
std::string output;
if (deployment_type_ == DeploymentType::LOCAL) {
#ifdef _WIN32
if (!is_cassandra()) {
std::stringstream message;
message << server_type_.to_string() << " v" << dse_version_.to_string()
<< " cannot be launched on Windows platform";
throw BridgeException(message.str());
}
#endif
utils::Process::Result result = utils::Process::execute(ccm_command);
if (result.exit_status != 0) {
throw BridgeException(result.standard_error);
}
output = result.standard_output;
#ifdef CASS_USE_LIBSSH2
} else if (deployment_type_ == DeploymentType::REMOTE) {
output = execute_libssh2_command(ccm_command);
if (!output.empty()) LOG(trim(output));
#endif
}
return output;
}
std::string CCM::Bridge::get_active_cluster() {
std::string active_cluster;
std::vector<std::string> clusters = get_available_clusters(active_cluster);
return active_cluster;
}
std::vector<std::string> CCM::Bridge::get_available_clusters() {
std::string active_cluster;
return get_available_clusters(active_cluster);
}
std::vector<std::string> CCM::Bridge::get_available_clusters(std::string& active_cluster) {
// Create the cluster list command and get the list of clusters
std::vector<std::string> list_command;
list_command.push_back("list");
std::vector<std::string> clusters = explode(execute_ccm_command(list_command));
// Determine the active cluster and correct the cluster array
int index = 0;
for (std::vector<std::string>::const_iterator iterator = clusters.begin();
iterator < clusters.end(); ++iterator) {
std::string cluster = *iterator;
if (cluster.compare(0, 1, "*") == 0) {
cluster.erase(std::remove(cluster.begin(), cluster.end(), '*'), cluster.end());
active_cluster = cluster;
clusters[index] = cluster;
}
++index;
}
return clusters;
}
std::string CCM::Bridge::generate_cluster_name(CassVersion cassandra_version,
std::vector<unsigned short> data_center_nodes,
bool with_vnodes, bool is_password_authenticator,
bool is_ssl, bool is_client_authentication) {
std::stringstream cluster_name;
std::string server_version =
!is_cassandra() ? dse_version_.to_string(false) : cassandra_version.to_string(false);
std::replace(server_version.begin(), server_version.end(), '.', '-');
cluster_name << cluster_prefix_ << "_" << server_version << "_"
<< generate_cluster_nodes(data_center_nodes, '-');
if (with_vnodes) {
cluster_name << "-vnodes";
}
if (is_password_authenticator) {
cluster_name << "-password_authenticator";
}
if (is_ssl) {
cluster_name << "-ssl";
if (is_client_authentication) {
cluster_name << "-client_authentication";
}
}
return cluster_name.str();
}
std::string CCM::Bridge::generate_cluster_nodes(std::vector<unsigned short> data_center_nodes,
char separator /* = ':'*/) {
std::stringstream cluster_nodes;
for (std::vector<unsigned short>::iterator iterator = data_center_nodes.begin();
iterator != data_center_nodes.end(); ++iterator) {
cluster_nodes << *iterator;
if ((iterator + 1) != data_center_nodes.end()) {
cluster_nodes << separator;
}
}
return cluster_nodes.str();
}
std::vector<std::string>
CCM::Bridge::generate_create_updateconf_command(CassVersion cassandra_version) {
// TODO: Add SSL setup and client authentication
// Create the update configuration command (common updates)
std::vector<std::string> updateconf_command;
updateconf_command.push_back("updateconf");
// Disable optimizations (limits) when using DSE/DDAC
if (is_cassandra()) {
updateconf_command.push_back("--rt=10000");
updateconf_command.push_back("read_request_timeout_in_ms:10000");
updateconf_command.push_back("write_request_timeout_in_ms:10000");
updateconf_command.push_back("request_timeout_in_ms:10000");
updateconf_command.push_back("phi_convict_threshold:16");
updateconf_command.push_back("hinted_handoff_enabled:false");
updateconf_command.push_back("dynamic_snitch_update_interval_in_ms:1000");
updateconf_command.push_back("native_transport_max_threads:1");
updateconf_command.push_back("concurrent_reads:2");
updateconf_command.push_back("concurrent_writes:2");
updateconf_command.push_back("concurrent_compactors:1");
updateconf_command.push_back("compaction_throughput_mb_per_sec:0");
updateconf_command.push_back("key_cache_size_in_mb:0");
updateconf_command.push_back("key_cache_save_period:0");
updateconf_command.push_back("memtable_flush_writers:1");
updateconf_command.push_back("max_hints_delivery_threads:1");
// Create Cassandra version specific updates (C* v1.2.x)
if (cassandra_version < "2.0.0") {
updateconf_command.push_back("reduce_cache_sizes_at:0");
updateconf_command.push_back("reduce_cache_capacity_to:0");
updateconf_command.push_back("flush_largest_memtables_at:0");
updateconf_command.push_back("index_interval:512");
} else {
updateconf_command.push_back("cas_contention_timeout_in_ms:10000");
updateconf_command.push_back("file_cache_size_in_mb:0");
}
// Create Cassandra version specific updates (C* < v2.1)
if (cassandra_version < "2.1.0") {
updateconf_command.push_back("in_memory_compaction_limit_in_mb:1");
}
// Create Cassandra version specific updates (C* < v4.0)
if (cassandra_version < "4.0.0") {
updateconf_command.push_back("rpc_min_threads:1");
updateconf_command.push_back("rpc_max_threads:1");
}
}
// Create Cassandra version specific updated (C* 2.2+)
if (cassandra_version >= "2.2.0") {
updateconf_command.push_back("enable_user_defined_functions:true");
}
// Create Cassandra version specific updated (C* 3.0+)
if (cassandra_version >= "3.0.0") {
updateconf_command.push_back("enable_scripted_user_defined_functions:true");
}
if (cassandra_version >= "4.0.0" && !is_dse()) {
updateconf_command.push_back("enable_materialized_views:true");
updateconf_command.push_back("enable_user_defined_functions:true");
}
return updateconf_command;
}
std::string CCM::Bridge::generate_dse_workloads(std::vector<DseWorkload> workloads) {
std::string dse_workloads;
for (std::vector<DseWorkload>::iterator iterator = workloads.begin(); iterator != workloads.end();
++iterator) {
dse_workloads += dse_workloads_[*iterator];
if ((iterator + 1) != workloads.end()) {
dse_workloads += ",";
}
}
return dse_workloads;
}
std::string CCM::Bridge::generate_node_name(unsigned int node) {
std::stringstream node_name;
node_name << "node" << node;
return node_name.str();
}
unsigned int CCM::Bridge::get_next_available_node() {
ClusterStatus status = cluster_status();
unsigned int next_available_node = status.node_count + 1;
if (next_available_node > CLUSTER_NODE_LIMIT) {
std::stringstream message;
message << "Failed to get next available node; cluster limit of \"" << CLUSTER_NODE_LIMIT
<< "\" nodes reached";
throw BridgeException(message.str());
}
return next_available_node;
}
bool CCM::Bridge::is_node_availabe(unsigned int node) {
std::stringstream ip_address;
ip_address << get_ip_prefix() << node;
return is_node_availabe(ip_address.str());
}
bool CCM::Bridge::is_node_availabe(const std::string& ip_address) {
Socket socket;
try {
socket.establish_connection(ip_address, CASSANDRA_BINARY_PORT);
return true;
} catch (...) {
; // No-op
}
return false;
}
std::string CCM::Bridge::to_lower(const std::string& input) {
std::string lowercase = input;
std::transform(lowercase.begin(), lowercase.end(), lowercase.begin(), ::tolower);
return lowercase;
}
std::string CCM::Bridge::trim(const std::string& input) {
std::string result;
if (!input.empty()) {
// Trim right
result = input.substr(0, input.find_last_not_of(TRIM_DELIMETERS) + 1);
if (!result.empty()) {
// Trim left
result = result.substr(result.find_first_not_of(TRIM_DELIMETERS));
}
}
return result;
}
std::string CCM::Bridge::implode(const std::vector<std::string>& elements,
const char delimiter /*= ' '*/) {
// Iterate through each element in the vector and concatenate the string
std::string result;
for (std::vector<std::string>::const_iterator iterator = elements.begin();
iterator < elements.end(); ++iterator) {
result += *iterator;
if ((iterator + 1) != elements.end()) {
result += delimiter;
}
}
return result;
}
std::vector<std::string> CCM::Bridge::explode(const std::string& input,
const char delimiter /*= ' '*/) {
// Iterate over the input line and parse the tokens
std::vector<std::string> result;
std::istringstream parser(input);
for (std::string token; std::getline(parser, token, delimiter);) {
if (!token.empty()) {
result.push_back(trim(token));
}
}
return result;
}
void CCM::Bridge::msleep(unsigned int milliseconds) {
#ifdef _WIN32
Sleep(milliseconds);
#else
// Convert the milliseconds into a proper timespec structure
time_t seconds = static_cast<int>(milliseconds / 1000);
long int nanoseconds = static_cast<long int>((milliseconds - (seconds * 1000)) * 1000000);
// Assign the requested time and perform sleep
struct timespec requested;
requested.tv_sec = seconds;
requested.tv_nsec = nanoseconds;
while (nanosleep(&requested, &requested) == -1) {
continue;
}
#endif
}