blob: d7ef6a678b20a9467257e5ef55e8e8ab985ad71e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "integration.hpp"
#include "options.hpp"
#include "driver_info.hpp"
#include "get_time.hpp"
#include "murmur3.hpp"
#include <algorithm>
#include <cstdarg>
#include <iostream>
#include <sys/stat.h>
#define FORMAT_BUFFER_SIZE 10240
#define ENTITY_MAXIMUM_LENGTH 48
#define SIMPLE_KEYSPACE_FORMAT "CREATE KEYSPACE IF NOT EXISTS %s WITH replication = %s"
#define REPLICATION_STRATEGY "{ 'class': %s }"
#define SELECT_SERVER_VERSION "SELECT release_version FROM system.local"
#define LOGGER_MAXIMUM_WAIT_TIME_MS 10000u
#define LOGGER_WAIT_FOR_NAP_MS 100
// Initialize static variables
bool Integration::skipped_message_displayed_ = false;
using namespace datastax::internal;
using namespace ::testing;
Integration::Integration()
: ccm_(NULL)
, session_()
, keyspace_name_("")
, table_name_("")
, system_schema_keyspaces_("system.schema_keyspaces")
, uuid_generator_()
, server_version_(Options::server_version())
, number_dc1_nodes_(1)
, number_dc2_nodes_(0)
, replication_factor_(0)
, replication_strategy_("")
, contact_points_("")
, is_password_authenticator_(false)
, is_client_authentication_(false)
, is_ssl_(false)
, is_with_vnodes_(false)
, is_randomized_contact_points_(false)
, is_schema_metadata_(false)
, is_ccm_requested_(true)
, is_ccm_start_requested_(true)
, is_ccm_start_node_individually_(false)
, is_session_requested_(true)
, is_keyspace_change_requested_(true)
, is_test_chaotic_(false)
, protocol_version_(CASS_PROTOCOL_VERSION_V4)
, create_keyspace_query_("")
, start_time_(0ull) {
// Determine if the schema keyspaces table should be updated
// TODO: Make cass_version (and dse_version) available for all tests
CCM::CassVersion cass_version = server_version_;
if (!Options::is_cassandra()) {
cass_version = static_cast<CCM::DseVersion>(cass_version).get_cass_version();
}
if (cass_version >= "3.0.0") {
system_schema_keyspaces_ = "system_schema.keyspaces";
}
// Get the name of the test and the case/suite it belongs to
const TestInfo* test_information = UnitTest::GetInstance()->current_test_info();
test_name_ = test_information->name();
// Determine if this is a typed test (e.g. ends in a number)
const char* type_param = test_information->type_param();
if (type_param) {
std::vector<std::string> tokens = explode(test_information->test_case_name(), '/');
for (std::vector<std::string>::const_iterator iterator = tokens.begin();
iterator < tokens.end(); ++iterator) {
std::string token = *iterator;
// Determine if we are looking at the last token
if ((iterator + 1) == tokens.end()) {
size_t number = 0;
std::stringstream tokenStream(token);
if (!(tokenStream >> number).fail()) {
std::vector<std::string> type_param_tokens = explode(type_param, ':');
size_t size = type_param_tokens.size();
test_case_name_ += test::Utils::replace_all(type_param_tokens[size - 1], ">", "");
}
} else {
test_case_name_ += token + "_";
}
}
} else {
test_case_name_ = test_information->test_case_name();
}
// Determine if file logging should be enabled for the integration tests
if (Options::log_tests()) {
logger_.initialize(test_case_name_, test_name_);
}
}
Integration::~Integration() {
try {
session_.close(false);
} catch (...) {
}
// Reset the skipped message displayed state
skipped_message_displayed_ = false;
}
void Integration::SetUp() {
// Initialize the DSE workload (iff not set)
if (dse_workload_.empty()) {
dse_workload_.push_back(CCM::DSE_WORKLOAD_CASSANDRA);
}
// Generate the default settings for most tests (handles overridden values)
keyspace_name_ = default_keyspace();
table_name_ = default_table();
if (replication_factor_ == 0) {
replication_factor_ = default_replication_factor();
}
replication_strategy_ = default_replication_strategy();
// Generate the keyspace query
create_keyspace_query_ =
format_string(SIMPLE_KEYSPACE_FORMAT, keyspace_name_.c_str(), replication_strategy_.c_str());
// Create the data center nodes vector
std::vector<unsigned short> data_center_nodes;
data_center_nodes.push_back(number_dc1_nodes_);
data_center_nodes.push_back(number_dc2_nodes_);
if (is_ccm_requested_) {
try {
// Create and start the CCM cluster (if not already created)
ccm_ = new CCM::Bridge(
server_version_, Options::use_git(), Options::branch_tag(), Options::use_install_dir(),
Options::install_dir(), Options::server_type(), dse_workload_, Options::cluster_prefix(),
Options::dse_credentials(), Options::dse_username(), Options::dse_password(),
Options::deployment_type(), Options::authentication_type(), Options::host(),
Options::port(), Options::username(), Options::password(), Options::public_key(),
Options::private_key(), Options::is_verbose_ccm());
if (ccm_->create_cluster(data_center_nodes, is_with_vnodes_, is_password_authenticator_,
is_ssl_, is_client_authentication_)) {
if (is_ccm_start_requested_) {
if (is_ccm_start_node_individually_) {
for (unsigned short node = 1; node <= (number_dc1_nodes_ + number_dc2_nodes_); ++node) {
if (is_password_authenticator_) {
ccm_->start_node(node, "-Dcassandra.superuser_setup_delay_ms=0");
} else {
ccm_->start_node(node);
}
}
} else {
if (is_password_authenticator_) {
ccm_->start_cluster("-Dcassandra.superuser_setup_delay_ms=0");
} else {
ccm_->start_cluster();
}
}
}
}
// Generate the default contact points
contact_points_ =
generate_contact_points(ccm_->get_ip_prefix(), number_dc1_nodes_ + number_dc2_nodes_);
// Determine if the session connection should be established
if (is_session_requested_ && is_ccm_start_requested_) {
connect();
}
} catch (CCM::BridgeException be) {
// Issue creating the CCM bridge instance (force failure)
FAIL() << be.what();
}
}
}
void Integration::TearDown() {
// Restart and resume all stopped and paused nodes
if (!is_test_chaotic_) { // No need to restart as cluster will be destroyed
for (std::vector<unsigned int>::iterator it = stopped_nodes_.begin(),
end = stopped_nodes_.end();
it != end; ++it) {
TEST_LOG("Restarting Node Stopped in " << test_name_ << ": " << *it);
ccm_->start_node(*it);
}
for (std::vector<unsigned int>::iterator it = paused_nodes_.begin(), end = paused_nodes_.end();
it != end; ++it) {
TEST_LOG("Resuming Node Paused in " << test_name_ << ": " << *it);
ccm_->resume_node(*it);
}
}
stopped_nodes_.clear();
paused_nodes_.clear();
// Drop keyspace for integration test (may or may have not been created)
if (!is_test_chaotic_) { // No need to drop keyspace as cluster will be destroyed
std::stringstream use_keyspace_query;
use_keyspace_query << "DROP KEYSPACE " << keyspace_name_;
try {
session_.execute(use_keyspace_query.str(), CASS_CONSISTENCY_ANY, false, false);
} catch (...) {
}
}
// Determine if the CCM cluster should be destroyed
if (is_test_chaotic_) {
// Destroy the current cluster and reset the chaos flag for the next test
if (!Options::keep_clusters()) {
ccm_->remove_cluster();
} else {
TEST_LOG_ERROR("Chaotic test detected and keep clusters requested. Cluster will not be "
"removed and may result in later failures.");
}
is_test_chaotic_ = false;
}
}
std::string Integration::default_keyspace() {
if (!keyspace_name_.empty()) {
return keyspace_name_;
}
// Clean up the initial keyspace name (remove category information)
keyspace_name_ = to_lower(test_case_name_) + "_" + to_lower(test_name_);
keyspace_name_ = replace_all(keyspace_name_, "tests",
""); // TODO: Rename integration tests (remove 's' or add 's')
keyspace_name_ = replace_all(keyspace_name_, "test", "");
keyspace_name_ = replace_all(keyspace_name_, "integration", "");
for (TestCategory::iterator iterator = TestCategory::begin(); iterator != TestCategory::end();
++iterator) {
keyspace_name_ = replace_all(keyspace_name_, "_" + to_lower(iterator->name()) + "_", "");
}
// Generate the keyspace name
maybe_shrink_name(keyspace_name_);
return keyspace_name_;
}
unsigned short Integration::default_replication_factor() {
// Calculate and return the default replication factor
return (number_dc1_nodes_ % 2 == 0) ? number_dc1_nodes_ / 2 : (number_dc1_nodes_ + 1) / 2;
}
std::string Integration::default_replication_strategy() {
// Determine the replication strategy
std::stringstream replication_strategy_s;
if (number_dc2_nodes_ > 0) {
replication_strategy_s << "'NetworkTopologyStrategy', 'dc1': " << number_dc1_nodes_ << ", "
<< "'dc2': " << number_dc2_nodes_;
} else {
replication_strategy_s << "'SimpleStrategy', 'replication_factor': ";
// Ensure the replication factor has not been overridden or already set
if (replication_factor_ == 0) {
replication_factor_ = default_replication_factor();
}
replication_strategy_s << replication_factor_;
}
// Return the default replication strategy
std::string replication_strategy = replication_strategy_s.str();
return format_string(REPLICATION_STRATEGY, replication_strategy.c_str());
}
std::string Integration::default_select_all() {
std::stringstream cql;
cql << "SELECT * FROM " << default_keyspace() << "." << default_table();
return cql.str();
}
int64_t Integration::default_select_count() {
Result result = session_.execute(format_string(SELECT_COUNT_FORMAT, table_name_.c_str()));
EXPECT_EQ(CASS_OK, result.error_code()) << "Unable to get Row Count: " << result.error_message();
return result.first_row().next().as<BigInteger>().value();
}
std::string Integration::default_table() {
if (!table_name_.empty()) {
return table_name_;
}
table_name_ = to_lower(test_name_);
table_name_ = replace_all(table_name_, "integration_", "");
maybe_shrink_name(table_name_);
return table_name_;
}
void Integration::drop_table(const std::string& table_name) {
// Drop table from the current keyspace
std::stringstream drop_table_query;
drop_table_query << "DROP TABLE " << table_name;
session_.execute(drop_table_query.str(), CASS_CONSISTENCY_ANY, false, false);
}
void Integration::drop_type(const std::string& type_name) {
// Drop type from the current keyspace
std::stringstream drop_type_query;
drop_type_query << "DROP TYPE " << type_name;
session_.execute(drop_type_query.str(), CASS_CONSISTENCY_ANY, false, false);
}
bool Integration::use_keyspace(const std::string& keyspace_name) {
std::stringstream use_keyspace_query;
use_keyspace_query << "USE " << keyspace_name;
session_.execute(use_keyspace_query.str());
if (this->HasFailure()) {
return false;
}
return true;
}
void Integration::connect(Cluster cluster) {
// Establish the session connection
cluster_ = cluster;
session_ = cluster.connect();
CHECK_FAILURE;
// Update the server version if branch_tag was specified
if (Options::use_git() && !Options::branch_tag().empty()) {
if (Options::is_ddac()) {
FAIL() << "Unable to build DDAC from Branch/Tag";
return;
}
if (Options::is_dse()) {
server_version_ = ccm_->get_dse_version();
} else {
server_version_ = ccm_->get_cassandra_version();
}
TEST_LOG("Branch/Tag Option was Used: Retrieved server version is "
<< server_version_.to_string());
}
// Create the keyspace for the integration test
session_.execute(create_keyspace_query_);
CHECK_FAILURE;
// Update the session to use the new keyspace by default
if (is_keyspace_change_requested_) {
use_keyspace(keyspace_name_);
}
}
void Integration::connect() {
// Create the cluster configuration and establish the session connection
cluster_ = default_cluster();
connect(cluster_);
}
test::driver::Cluster Integration::default_cluster(bool is_with_default_contact_points /*= true*/) {
// Create the default cluster object
Cluster cluster = Cluster::build()
.with_randomized_contact_points(is_randomized_contact_points_)
.with_schema_metadata(is_schema_metadata_);
if (is_with_default_contact_points) {
cluster.with_contact_points(contact_points_);
}
cluster.with_protocol_version(protocol_version_);
// Assign the execution profiles to the cluster object (if available)
for (ExecutionProfile::Map::iterator it = profiles_.begin(); it != profiles_.end(); ++it) {
cluster.with_execution_profile(it->first, it->second);
}
return cluster;
}
void Integration::enable_cluster_tracing(bool enable /*= true*/) {
std::vector<std::string> active_nodes = ccm_->cluster_ip_addresses();
for (std::vector<std::string>::iterator iterator = active_nodes.begin();
iterator != active_nodes.end(); ++iterator) {
// Get the node number from the IP address
std::string node_ip_address = *iterator;
std::stringstream node_value;
node_value << node_ip_address[node_ip_address.length() - 1];
// Enable tracing on the node
unsigned int node;
node_value >> node;
ccm_->enable_node_trace(node);
}
}
bool Integration::decommission_node(unsigned int node, bool is_force /*= false */) {
// Decommission the requested node
bool status = ccm_->decommission_node(node, is_force);
if (status) {
// Indicate the test is chaotic
is_test_chaotic_ = true;
}
return status;
}
bool Integration::force_decommission_node(unsigned int node) {
return decommission_node(node, true);
}
bool Integration::start_node(unsigned int node) {
// Start the requested node and ensure paused nodes are ignored
std::vector<unsigned>::iterator it = std::find(paused_nodes_.begin(), paused_nodes_.end(), node);
if (it == paused_nodes_.end() && ccm_->is_node_down(node, true)) {
bool status = ccm_->start_node(node);
it = std::find(stopped_nodes_.begin(), stopped_nodes_.end(), node);
if (it != stopped_nodes_.end()) {
stopped_nodes_.erase(it);
}
return status;
}
return false;
}
bool Integration::stop_node(unsigned int node, bool is_kill /*= false*/) {
if (ccm_->is_node_up(node, true)) {
bool status = ccm_->stop_node(node, is_kill);
if (status) {
stopped_nodes_.push_back(node);
}
return status;
}
return false;
}
bool Integration::pause_node(unsigned int node) {
std::vector<unsigned>::iterator it = std::find(paused_nodes_.begin(), paused_nodes_.end(), node);
if (it == paused_nodes_.end() && ccm_->is_node_up(node, true)) {
ccm_->pause_node(node);
paused_nodes_.push_back(node);
return true;
}
return false;
}
bool Integration::resume_node(unsigned int node) {
std::vector<unsigned>::iterator it = std::find(paused_nodes_.begin(), paused_nodes_.end(), node);
if (it != paused_nodes_.end()) {
ccm_->resume_node(node);
paused_nodes_.erase(it);
return true;
}
return false;
}
std::string Integration::generate_contact_points(const std::string& ip_prefix,
size_t number_of_nodes) {
// Iterate over the total number of nodes to create the contact list
std::vector<std::string> contact_points;
for (size_t i = 1; i <= number_of_nodes; ++i) {
std::stringstream contact_point;
contact_point << ip_prefix << i;
contact_points.push_back(contact_point.str());
}
return implode(contact_points, ',');
}
std::string Integration::format_string(const char* format, ...) const {
// Create a buffer for the formatting of the string
char buffer[FORMAT_BUFFER_SIZE] = { '\0' };
// Parse the arguments into the buffer
va_list args;
va_start(args, format);
vsnprintf(buffer, FORMAT_BUFFER_SIZE, format, args);
va_end(args);
// Return the formatted string
return buffer;
}
void Integration::maybe_shrink_name(std::string& name) {
if (name.size() > ENTITY_MAXIMUM_LENGTH) {
// Update the name with a UUID (first portions of v4 UUID)
std::vector<std::string> uuid_octets = explode(uuid_generator_.generate_timeuuid().str(), '-');
std::string id = uuid_octets[0] + uuid_octets[3];
name = name.substr(0, ENTITY_MAXIMUM_LENGTH - id.size()) + id;
}
}
bool Integration::wait_for_logger(size_t expected_count) {
start_timer();
while (elapsed_time() < LOGGER_MAXIMUM_WAIT_TIME_MS && logger_.count() < expected_count) {
msleep(LOGGER_WAIT_FOR_NAP_MS);
}
return logger_.count() >= expected_count;
}
int64_t Integration::murmur3_hash(const std::string& value) {
return MurmurHash3_x64_128(value.data(), value.size(), 0);
}
uint64_t Integration::time_since_epoch_in_ms() { return get_time_since_epoch_ms(); }
uint64_t Integration::time_since_epoch_us() { return get_time_since_epoch_us(); }
std::string Integration::driver_name() { return datastax::internal::driver_name(); }
std::string Integration::driver_version() { return datastax::internal::driver_version(); }