blob: 2f67206413fc1063d1a98fffcc28735172bca3f6 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "cluster.hpp"
#include "constants.hpp"
#include "dc_aware_policy.hpp"
#include "external.hpp"
#include "logger.hpp"
#include "resolver.hpp"
#include "round_robin_policy.hpp"
#include "speculative_execution.hpp"
#include "utils.hpp"
using namespace datastax;
using namespace datastax::internal::core;
namespace datastax { namespace internal { namespace core {
/**
* A task for initiating the cluster close process.
*/
class ClusterRunClose : public Task {
public:
ClusterRunClose(const Cluster::Ptr& cluster)
: cluster_(cluster) {}
void run(EventLoop* event_loop) { cluster_->internal_close(); }
private:
Cluster::Ptr cluster_;
};
/**
* A task for marking a node as UP.
*/
class ClusterNotifyUp : public Task {
public:
ClusterNotifyUp(const Cluster::Ptr& cluster, const Address& address)
: cluster_(cluster)
, address_(address) {}
void run(EventLoop* event_loop) { cluster_->internal_notify_host_up(address_); }
private:
Cluster::Ptr cluster_;
Address address_;
};
/**
* A task for marking a node as DOWN.
*/
class ClusterNotifyDown : public Task {
public:
ClusterNotifyDown(const Cluster::Ptr& cluster, const Address& address)
: cluster_(cluster)
, address_(address) {}
void run(EventLoop* event_loop) { cluster_->internal_notify_host_down(address_); }
private:
Cluster::Ptr cluster_;
Address address_;
};
class ClusterStartEvents : public Task {
public:
ClusterStartEvents(const Cluster::Ptr& cluster)
: cluster_(cluster) {}
void run(EventLoop* event_loop) { cluster_->internal_start_events(); }
private:
Cluster::Ptr cluster_;
};
class ClusterStartClientMonitor : public Task {
public:
ClusterStartClientMonitor(const Cluster::Ptr& cluster, const String& client_id,
const String& session_id, const Config& config)
: cluster_(cluster)
, client_id_(client_id)
, session_id_(session_id)
, config_(config) {}
void run(EventLoop* event_loop) {
cluster_->internal_start_monitor_reporting(client_id_, session_id_, config_);
}
private:
Cluster::Ptr cluster_;
String client_id_;
String session_id_;
Config config_;
};
/**
* A no operation cluster listener. This is used when a listener is not set.
*/
class NopClusterListener : public ClusterListener {
public:
virtual void on_connect(Cluster* cluster) {}
virtual void on_host_up(const Host::Ptr& host) {}
virtual void on_host_down(const Host::Ptr& host) {}
virtual void on_host_added(const Host::Ptr& host) {}
virtual void on_host_removed(const Host::Ptr& host) {}
virtual void on_token_map_updated(const TokenMap::Ptr& token_map) {}
virtual void on_close(Cluster* cluster) {}
};
}}} // namespace datastax::internal::core
void ClusterEvent::process_event(const ClusterEvent& event, ClusterListener* listener) {
switch (event.type) {
case HOST_UP:
listener->on_host_up(event.host);
break;
case HOST_DOWN:
listener->on_host_down(event.host);
break;
case HOST_ADD:
listener->on_host_added(event.host);
break;
case HOST_REMOVE:
listener->on_host_removed(event.host);
break;
case HOST_MAYBE_UP:
listener->on_host_maybe_up(event.host);
break;
case HOST_READY:
listener->on_host_ready(event.host);
break;
case TOKEN_MAP_UPDATE:
listener->on_token_map_updated(event.token_map);
break;
}
}
void ClusterEvent::process_events(const ClusterEvent::Vec& events, ClusterListener* listener) {
for (ClusterEvent::Vec::const_iterator it = events.begin(), end = events.end(); it != end; ++it) {
process_event(*it, listener);
}
}
static NopClusterListener nop_cluster_listener__;
LockedHostMap::LockedHostMap(const HostMap& hosts)
: hosts_(hosts) {
uv_mutex_init(&mutex_);
}
LockedHostMap::~LockedHostMap() { uv_mutex_destroy(&mutex_); }
LockedHostMap::const_iterator LockedHostMap::find(const Address& address) const {
HostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
// If this is from an event (not SNI) and we're using SNI addresses then fallback to using the
// "rpc_address" to compare.
for (HostMap::const_iterator i = hosts_.begin(), end = hosts_.end(); i != end; ++i) {
if (i->second->rpc_address() == address) {
return i;
}
}
}
return it;
}
Host::Ptr LockedHostMap::get(const Address& address) const {
ScopedMutex l(&mutex_);
const_iterator it = find(address);
if (it == end()) return Host::Ptr();
return it->second;
}
void LockedHostMap::erase(const Address& address) {
ScopedMutex l(&mutex_);
hosts_.erase(address);
}
Host::Ptr& LockedHostMap::operator[](const Address& address) {
ScopedMutex l(&mutex_);
return hosts_[address];
}
LockedHostMap& LockedHostMap::operator=(const HostMap& hosts) {
ScopedMutex l(&mutex_);
hosts_ = hosts;
return *this;
}
ClusterSettings::ClusterSettings()
: load_balancing_policy(new RoundRobinPolicy())
, port(CASS_DEFAULT_PORT)
, reconnection_policy(new ExponentialReconnectionPolicy())
, prepare_on_up_or_add_host(CASS_DEFAULT_PREPARE_ON_UP_OR_ADD_HOST)
, max_prepares_per_flush(CASS_DEFAULT_MAX_PREPARES_PER_FLUSH)
, disable_events_on_startup(false)
, cluster_metadata_resolver_factory(new DefaultClusterMetadataResolverFactory()) {
load_balancing_policies.push_back(load_balancing_policy);
}
ClusterSettings::ClusterSettings(const Config& config)
: control_connection_settings(config)
, load_balancing_policy(config.load_balancing_policy())
, load_balancing_policies(config.load_balancing_policies())
, port(config.port())
, reconnection_policy(config.reconnection_policy())
, prepare_on_up_or_add_host(config.prepare_on_up_or_add_host())
, max_prepares_per_flush(CASS_DEFAULT_MAX_PREPARES_PER_FLUSH)
, disable_events_on_startup(false)
, cluster_metadata_resolver_factory(config.cluster_metadata_resolver_factory()) {}
Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener,
EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts,
const ControlConnectionSchema& schema,
const LoadBalancingPolicy::Ptr& load_balancing_policy,
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
const StringMultimap& supported_options, const ClusterSettings& settings)
: connection_(connection)
, listener_(listener ? listener : &nop_cluster_listener__)
, event_loop_(event_loop)
, load_balancing_policy_(load_balancing_policy)
, load_balancing_policies_(load_balancing_policies)
, settings_(settings)
, is_closing_(false)
, connected_host_(connected_host)
, hosts_(hosts)
, local_dc_(local_dc)
, supported_options_(supported_options)
, is_recording_events_(settings.disable_events_on_startup) {
inc_ref();
connection_->set_listener(this);
query_plan_.reset(load_balancing_policy_->new_query_plan("", NULL, NULL));
update_schema(schema);
update_token_map(hosts, connected_host_->partitioner(), schema);
listener_->on_reconnect(this);
}
void Cluster::close() { event_loop_->add(new ClusterRunClose(Ptr(this))); }
void Cluster::notify_host_up(const Address& address) {
event_loop_->add(new ClusterNotifyUp(Ptr(this), address));
}
void Cluster::notify_host_down(const Address& address) {
event_loop_->add(new ClusterNotifyDown(Ptr(this), address));
}
void Cluster::start_events() { event_loop_->add(new ClusterStartEvents(Ptr(this))); }
void Cluster::start_monitor_reporting(const String& client_id, const String& session_id,
const Config& config) {
event_loop_->add(new ClusterStartClientMonitor(Ptr(this), client_id, session_id, config));
}
Metadata::SchemaSnapshot Cluster::schema_snapshot() { return metadata_.schema_snapshot(); }
Host::Ptr Cluster::find_host(const Address& address) const { return hosts_.get(address); }
PreparedMetadata::Entry::Ptr Cluster::prepared(const String& id) const {
return prepared_metadata_.get(id);
}
void Cluster::prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry) {
prepared_metadata_.set(id, entry);
}
HostMap Cluster::available_hosts() const {
HostMap available;
for (HostMap::const_iterator it = hosts_.begin(), end = hosts_.end(); it != end; ++it) {
if (!is_host_ignored(it->second)) {
available[it->first] = it->second;
}
}
return available;
}
void Cluster::set_listener(ClusterListener* listener) {
listener_ = listener ? listener : &nop_cluster_listener__;
}
void Cluster::update_hosts(const HostMap& hosts) {
// Update the hosts and properly notify the listener
HostMap existing(hosts_);
for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
HostMap::iterator find_it = existing.find(it->first);
if (find_it != existing.end()) {
existing.erase(find_it); // Already exists mark as visited
} else {
notify_host_add(it->second); // A new host has been added
}
}
// Any hosts that existed before, but aren't in the new hosts
// need to be marked as removed.
for (HostMap::const_iterator it = existing.begin(), end = existing.end(); it != end; ++it) {
notify_host_remove(it->first);
}
}
void Cluster::update_schema(const ControlConnectionSchema& schema) {
metadata_.clear_and_update_back(connection_->server_version());
if (schema.keyspaces) {
metadata_.update_keyspaces(schema.keyspaces.get(), false);
}
if (schema.tables) {
metadata_.update_tables(schema.tables.get());
}
if (schema.views) {
metadata_.update_views(schema.views.get());
}
if (schema.columns) {
metadata_.update_columns(schema.columns.get());
}
if (schema.indexes) {
metadata_.update_indexes(schema.indexes.get());
}
if (schema.user_types) {
metadata_.update_user_types(schema.user_types.get());
}
if (schema.functions) {
metadata_.update_functions(schema.functions.get());
}
if (schema.aggregates) {
metadata_.update_aggregates(schema.aggregates.get());
}
if (schema.virtual_keyspaces) {
metadata_.update_keyspaces(schema.virtual_keyspaces.get(), true);
}
if (schema.virtual_tables) {
metadata_.update_tables(schema.virtual_tables.get());
}
if (schema.virtual_columns) {
metadata_.update_columns(schema.virtual_columns.get());
}
metadata_.swap_to_back_and_update_front();
}
void Cluster::update_token_map(const HostMap& hosts, const String& partitioner,
const ControlConnectionSchema& schema) {
if (settings_.control_connection_settings.use_token_aware_routing && schema.keyspaces) {
// Create a new token map and populate it
token_map_ = TokenMap::from_partitioner(partitioner);
if (!token_map_) {
return; // Partition is not supported
}
token_map_->add_keyspaces(connection_->server_version(), schema.keyspaces.get());
for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) {
token_map_->add_host(it->second);
}
token_map_->build();
}
}
// All hosts from the cluster are included in the host map and in the load
// balancing policies (LBP) so that LBPs return the correct host distance (esp.
// important for DC-aware). This method prevents connection pools from being
// created to ignored hosts.
bool Cluster::is_host_ignored(const Host::Ptr& host) const {
return core::is_host_ignored(load_balancing_policies_, host);
}
void Cluster::schedule_reconnect() {
if (!reconnection_schedule_) {
reconnection_schedule_.reset(settings_.reconnection_policy->new_reconnection_schedule());
}
uint64_t delay_ms = reconnection_schedule_->next_delay_ms();
if (delay_ms > 0) {
timer_.start(connection_->loop(), delay_ms,
bind_callback(&Cluster::on_schedule_reconnect, this));
} else {
handle_schedule_reconnect();
}
}
void Cluster::on_schedule_reconnect(Timer* timer) { handle_schedule_reconnect(); }
void Cluster::handle_schedule_reconnect() {
const Host::Ptr& host = query_plan_->compute_next();
if (host) {
reconnector_.reset(new ControlConnector(host, connection_->protocol_version(),
bind_callback(&Cluster::on_reconnect, this)));
reconnector_->with_settings(settings_.control_connection_settings)
->connect(connection_->loop());
} else {
// No more hosts, refresh the query plan and schedule a re-connection
LOG_TRACE("Control connection query plan has no more hosts. "
"Reset query plan and schedule reconnect");
query_plan_.reset(load_balancing_policy_->new_query_plan("", NULL, NULL));
schedule_reconnect();
}
}
void Cluster::on_reconnect(ControlConnector* connector) {
reconnector_.reset();
if (is_closing_) {
handle_close();
return;
}
if (connector->is_ok()) {
connection_ = connector->release_connection();
connection_->set_listener(this);
// Incrementally update the hosts (notifying the listener)
update_hosts(connector->hosts());
// Get the newly connected host
connected_host_ = hosts_[connection_->address()];
assert(connected_host_ && "Connected host not found in hosts map");
update_schema(connector->schema());
update_token_map(connector->hosts(), connected_host_->partitioner(), connector->schema());
// Notify the listener that we've built a new token map
if (token_map_) {
notify_or_record(ClusterEvent(token_map_));
}
LOG_INFO("Control connection connected to %s", connected_host_->address_string().c_str());
listener_->on_reconnect(this);
reconnection_schedule_.reset();
} else if (!connector->is_canceled()) {
LOG_ERROR(
"Unable to reestablish a control connection to host %s because of the following error: %s",
connector->address().to_string().c_str(), connector->error_message().c_str());
schedule_reconnect();
}
}
void Cluster::internal_close() {
is_closing_ = true;
bool was_timer_running = timer_.is_running();
timer_.stop();
monitor_reporting_timer_.stop();
if (was_timer_running) {
handle_close();
} else if (reconnector_) {
reconnector_->cancel();
} else if (connection_) {
connection_->close();
}
}
void Cluster::handle_close() {
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
(*it)->close_handles();
}
connection_.reset();
listener_->on_close(this);
dec_ref();
}
void Cluster::internal_notify_host_up(const Address& address) {
LockedHostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
LOG_WARN("Attempting to mark host %s that we don't have as UP", address.to_string().c_str());
return;
}
Host::Ptr host(it->second);
if (load_balancing_policy_->is_host_up(address)) {
// Already marked up so don't repeat duplicate notifications.
if (!is_host_ignored(host)) {
notify_or_record(ClusterEvent(ClusterEvent::HOST_READY, host));
}
return;
}
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
(*it)->on_host_up(host);
}
if (is_host_ignored(host)) {
return; // Ignore host
}
if (!prepare_host(host, bind_callback(&Cluster::on_prepare_host_up, Cluster::Ptr(this)))) {
notify_host_up_after_prepare(host);
}
}
void Cluster::notify_host_up_after_prepare(const Host::Ptr& host) {
notify_or_record(ClusterEvent(ClusterEvent::HOST_READY, host));
notify_or_record(ClusterEvent(ClusterEvent::HOST_UP, host));
}
void Cluster::internal_notify_host_down(const Address& address) {
LockedHostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
// Using DEBUG level here because this can happen normally as the result of
// a remove event.
LOG_DEBUG("Attempting to mark host %s that we don't have as DOWN", address.to_string().c_str());
return;
}
Host::Ptr host(it->second);
if (!load_balancing_policy_->is_host_up(address)) {
// Already marked down so don't repeat duplicate notifications.
return;
}
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
(*it)->on_host_down(address);
}
notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
}
void Cluster::internal_start_events() {
// Ignore if closing or already processed events
if (!is_closing_ && is_recording_events_) {
is_recording_events_ = false;
ClusterEvent::process_events(recorded_events_, listener_);
recorded_events_.clear();
}
}
void Cluster::internal_start_monitor_reporting(const String& client_id, const String& session_id,
const Config& config) {
monitor_reporting_.reset(create_monitor_reporting(client_id, session_id, config));
if (!is_closing_ && monitor_reporting_->interval_ms(connection_->dse_server_version()) > 0) {
monitor_reporting_->send_startup_message(connection_->connection(), config, available_hosts(),
load_balancing_policies_);
monitor_reporting_timer_.start(
event_loop_->loop(), monitor_reporting_->interval_ms(connection_->dse_server_version()),
bind_callback(&Cluster::on_monitor_reporting, this));
}
}
void Cluster::on_monitor_reporting(Timer* timer) {
if (!is_closing_) {
monitor_reporting_->send_status_message(connection_->connection(), available_hosts());
monitor_reporting_timer_.start(
event_loop_->loop(), monitor_reporting_->interval_ms(connection_->dse_server_version()),
bind_callback(&Cluster::on_monitor_reporting, this));
}
}
void Cluster::notify_host_add(const Host::Ptr& host) {
LockedHostMap::const_iterator host_it = hosts_.find(host->address());
if (host_it != hosts_.end()) {
LOG_WARN("Attempting to add host %s that we already have", host->address_string().c_str());
// If an entry already exists then notify that the node has been removed
// then re-add it.
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
(*it)->on_host_removed(host_it->second);
}
notify_or_record(ClusterEvent(ClusterEvent::HOST_REMOVE, host));
}
hosts_[host->address()] = host;
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
(*it)->on_host_added(host);
}
if (is_host_ignored(host)) {
return; // Ignore host
}
if (!prepare_host(host, bind_callback(&Cluster::on_prepare_host_add, Cluster::Ptr(this)))) {
notify_host_add_after_prepare(host);
}
}
void Cluster::notify_host_add_after_prepare(const Host::Ptr& host) {
if (token_map_) {
token_map_ = token_map_->copy();
token_map_->update_host_and_build(host);
notify_or_record(ClusterEvent(token_map_));
}
notify_or_record(ClusterEvent(ClusterEvent::HOST_ADD, host));
}
void Cluster::notify_host_remove(const Address& address) {
LockedHostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
LOG_WARN("Attempting removing host %s that we don't have", address.to_string().c_str());
return;
}
Host::Ptr host(it->second);
if (token_map_) {
token_map_ = token_map_->copy();
token_map_->remove_host_and_build(host);
notify_or_record(ClusterEvent(token_map_));
}
// If not marked down yet then explicitly trigger the event.
if (load_balancing_policy_->is_host_up(address)) {
notify_or_record(ClusterEvent(ClusterEvent::HOST_DOWN, host));
}
hosts_.erase(it->first);
for (LoadBalancingPolicy::Vec::const_iterator it = load_balancing_policies_.begin(),
end = load_balancing_policies_.end();
it != end; ++it) {
(*it)->on_host_removed(host);
}
notify_or_record(ClusterEvent(ClusterEvent::HOST_REMOVE, host));
}
void Cluster::notify_or_record(const ClusterEvent& event) {
if (is_recording_events_) {
recorded_events_.push_back(event);
} else {
ClusterEvent::process_event(event, listener_);
}
}
bool Cluster::prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback) {
if (connection_ && settings_.prepare_on_up_or_add_host) {
PrepareHostHandler::Ptr prepare_host_handler(
new PrepareHostHandler(host, prepared_metadata_.copy(), callback,
connection_->protocol_version(), settings_.max_prepares_per_flush));
prepare_host_handler->prepare(connection_->loop(),
settings_.control_connection_settings.connection_settings);
return true;
}
return false;
}
void Cluster::on_prepare_host_add(const PrepareHostHandler* handler) {
notify_host_add_after_prepare(handler->host());
}
void Cluster::on_prepare_host_up(const PrepareHostHandler* handler) {
notify_host_up_after_prepare(handler->host());
}
void Cluster::on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
const String& keyspace_name, const String& target_name) {
switch (type) {
case KEYSPACE:
// Virtual keyspaces are not updated (always false)
metadata_.update_keyspaces(result.get(), false);
if (token_map_) {
token_map_ = token_map_->copy();
token_map_->update_keyspaces_and_build(connection_->server_version(), result.get());
notify_or_record(ClusterEvent(token_map_));
}
break;
case TABLE:
metadata_.update_tables(result.get());
break;
case VIEW:
metadata_.update_views(result.get());
break;
case COLUMN:
metadata_.update_columns(result.get());
break;
case INDEX:
metadata_.update_indexes(result.get());
break;
case USER_TYPE:
metadata_.update_user_types(result.get());
break;
case FUNCTION:
metadata_.update_functions(result.get());
break;
case AGGREGATE:
metadata_.update_aggregates(result.get());
break;
}
}
void Cluster::on_drop_schema(SchemaType type, const String& keyspace_name,
const String& target_name) {
switch (type) {
case KEYSPACE:
metadata_.drop_keyspace(keyspace_name);
if (token_map_) {
token_map_ = token_map_->copy();
token_map_->drop_keyspace(keyspace_name);
notify_or_record(ClusterEvent(token_map_));
}
break;
case TABLE:
metadata_.drop_table_or_view(keyspace_name, target_name);
break;
case VIEW:
metadata_.drop_table_or_view(keyspace_name, target_name);
break;
case USER_TYPE:
metadata_.drop_user_type(keyspace_name, target_name);
break;
case FUNCTION:
metadata_.drop_function(keyspace_name, target_name);
break;
case AGGREGATE:
metadata_.drop_aggregate(keyspace_name, target_name);
break;
default:
break;
}
}
void Cluster::on_up(const Address& address) {
LockedHostMap::const_iterator it = hosts_.find(address);
if (it == hosts_.end()) {
LOG_WARN("Received UP event for an unknown host %s", address.to_string().c_str());
return;
}
notify_or_record(ClusterEvent(ClusterEvent::HOST_MAYBE_UP, it->second));
}
void Cluster::on_down(const Address& address) {
// Ignore down events from the control connection. Use the method
// `notify_host_down()` to trigger the DOWN status.
}
void Cluster::on_add(const Host::Ptr& host) { notify_host_add(host); }
void Cluster::on_remove(const Address& address) { notify_host_remove(address); }
void Cluster::on_close(ControlConnection* connection) {
if (!is_closing_) {
LOG_WARN("Lost control connection to host %s", connection_->address_string().c_str());
schedule_reconnect();
} else {
handle_close();
}
}