blob: 814a50caa85ec978a919f4d0d9c589a799546d3e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "cluster_connector.hpp"
#include "dc_aware_policy.hpp"
#include "protocol.hpp"
#include "random.hpp"
#include "round_robin_policy.hpp"
using namespace datastax;
using namespace datastax::internal;
using namespace datastax::internal::core;
namespace datastax { namespace internal { namespace core {
/**
* A task for running the connection process.
*/
class RunResolveAndConnectCluster : public Task {
public:
RunResolveAndConnectCluster(const ClusterConnector::Ptr& connector)
: connector_(connector) {}
void run(EventLoop* event_loop) { connector_->internal_resolve_and_connect(); }
private:
ClusterConnector::Ptr connector_;
};
/**
* A task for canceling the connection process.
*/
class RunCancelCluster : public Task {
public:
RunCancelCluster(const ClusterConnector::Ptr& connector)
: connector_(connector) {}
void run(EventLoop* event_loop) { connector_->internal_cancel(); }
private:
ClusterConnector::Ptr connector_;
};
}}} // namespace datastax::internal::core
ClusterConnector::ClusterConnector(const AddressVec& contact_points,
ProtocolVersion protocol_version, const Callback& callback)
: remaining_connector_count_(0)
, contact_points_(contact_points)
, protocol_version_(protocol_version)
, listener_(NULL)
, event_loop_(NULL)
, random_(NULL)
, metrics_(NULL)
, callback_(callback)
, error_code_(CLUSTER_OK)
, ssl_error_code_(CASS_OK) {}
ClusterConnector* ClusterConnector::with_listener(ClusterListener* listener) {
listener_ = listener;
return this;
}
ClusterConnector* ClusterConnector::with_random(Random* random) {
random_ = random;
return this;
}
ClusterConnector* ClusterConnector::with_metrics(Metrics* metrics) {
metrics_ = metrics;
return this;
}
void ClusterConnector::connect(EventLoop* event_loop) {
event_loop_ = event_loop;
event_loop_->add(new RunResolveAndConnectCluster(Ptr(this)));
}
void ClusterConnector::cancel() {
if (event_loop_) {
event_loop_->add(new RunCancelCluster(Ptr(this)));
}
}
Cluster::Ptr ClusterConnector::release_cluster() {
Cluster::Ptr temp(cluster_);
cluster_.reset();
return temp;
}
void ClusterConnector::internal_resolve_and_connect() {
inc_ref();
if (random_ && !contact_points_.empty()) {
random_shuffle(contact_points_.begin(), contact_points_.end(), random_);
}
resolver_ = settings_.cluster_metadata_resolver_factory->new_instance(settings_);
resolver_->resolve(event_loop_->loop(), contact_points_,
bind_callback(&ClusterConnector::on_resolve, this));
}
void ClusterConnector::internal_connect(const Address& address, ProtocolVersion version) {
Host::Ptr host(new Host(address));
ControlConnector::Ptr connector(
new ControlConnector(host, version, bind_callback(&ClusterConnector::on_connect, this)));
connectors_[address] = connector; // Keep track of the connectors so they can be canceled.
connector->with_metrics(metrics_)
->with_settings(settings_.control_connection_settings)
->connect(event_loop_->loop());
}
void ClusterConnector::internal_cancel() {
error_code_ = CLUSTER_CANCELED;
if (resolver_) resolver_->cancel();
for (ConnectorMap::iterator it = connectors_.begin(), end = connectors_.end(); it != end; ++it) {
it->second->cancel();
}
if (cluster_) cluster_->close();
}
void ClusterConnector::finish() {
callback_(this);
if (cluster_) {
// If the callback doesn't take possession of the cluster then we should
// also clear the listener.
cluster_->set_listener();
cluster_->close();
}
// Explicitly release resources on the event loop thread.
resolver_.reset();
connectors_.clear();
cluster_.reset();
dec_ref();
}
void ClusterConnector::maybe_finish() {
if (remaining_connector_count_ > 0 && --remaining_connector_count_ == 0) {
finish();
}
}
void ClusterConnector::on_error(ClusterConnector::ClusterError code, const String& message) {
assert(code != CLUSTER_OK && "Notified error without an error");
error_message_ = message;
error_code_ = code;
maybe_finish();
}
void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) {
if (is_canceled()) {
finish();
return;
}
const AddressVec& resolved_contact_points(resolver->resolved_contact_points());
if (resolved_contact_points.empty()) {
error_code_ = CLUSTER_ERROR_NO_HOSTS_AVAILABLE;
error_message_ = "Unable to connect to any contact points";
finish();
return;
}
local_dc_ = resolver->local_dc();
remaining_connector_count_ = resolved_contact_points.size();
for (AddressVec::const_iterator it = resolved_contact_points.begin(),
end = resolved_contact_points.end();
it != end; ++it) {
internal_connect(*it, protocol_version_);
}
}
void ClusterConnector::on_connect(ControlConnector* connector) {
// Ignore logging protocol errors here because they're handled below
if (!connector->is_ok() && !connector->is_canceled() && !connector->is_invalid_protocol()) {
LOG_ERROR(
"Unable to establish a control connection to host %s because of the following error: %s",
connector->address().to_string().c_str(), connector->error_message().c_str());
}
// If the cluster object is successfully initialized or if the connector is
// canceled then attempt to finish the connection process.
if (cluster_ || is_canceled()) {
maybe_finish();
return;
}
// Otherwise, initialize the cluster and handle errors.
if (connector->is_ok()) {
const HostMap& hosts(connector->hosts());
LoadBalancingPolicy::Vec policies;
HostMap::const_iterator host_it = hosts.find(connector->address());
if (host_it == hosts.end()) {
// This error is unlikely to happen and it likely means that the local
// metadata is corrupt or missing and the control connection process
// would've probably failed before this happens.
LOG_ERROR("Current control connection host %s not found in hosts metadata",
connector->address().to_string().c_str());
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE,
"Control connection host is not found in hosts metadata");
return;
}
Host::Ptr connected_host(host_it->second);
// Build policies list including the default policy
LoadBalancingPolicy::Ptr default_policy(settings_.load_balancing_policy->new_instance());
policies.push_back(default_policy);
for (LoadBalancingPolicy::Vec::const_iterator it = settings_.load_balancing_policies.begin(),
end = settings_.load_balancing_policies.end();
it != end; ++it) {
policies.push_back(LoadBalancingPolicy::Ptr((*it)->new_instance()));
}
// Initialize all created policies
for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
it != end; ++it) {
LoadBalancingPolicy::Ptr policy(*it);
policy->init(connected_host, hosts, random_, local_dc_);
policy->register_handles(event_loop_->loop());
}
ScopedPtr<QueryPlan> query_plan(default_policy->new_query_plan("", NULL, NULL));
if (!query_plan->compute_next()) { // No hosts in the query plan
LOG_ERROR("Current control connection host %s has no hosts available in "
"it's query plan for the configured load balancing policy. If "
"using DC-aware check to see if the local datacenter is valid.",
connector->address().to_string().c_str());
const char* message;
if (dynamic_cast<DCAwarePolicy::DCAwareQueryPlan*>(query_plan.get()) !=
NULL) { // Check if DC-aware
message = "No hosts available for the control connection using the "
"DC-aware load balancing policy. "
"Check to see if the configured local datacenter is valid";
} else {
message = "No hosts available for the control connection using the "
"configured load balancing policy";
}
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE, message);
return;
}
cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_,
connected_host, hosts, connector->schema(), default_policy, policies,
local_dc_, connector->supported_options(), settings_));
// Clear any connection errors and set the final negotiated protocol version.
error_code_ = CLUSTER_OK;
error_message_.clear();
protocol_version_ = connector->protocol_version();
// The cluster is initialized so the rest of the connectors can be canceled.
for (ConnectorMap::iterator it = connectors_.begin(), end = connectors_.end(); it != end;
++it) {
if (it->first != connector->address()) { // Not the current connector.
it->second->cancel();
}
}
maybe_finish();
} else if (connector->is_invalid_protocol()) {
ProtocolVersion lower_version(connector->protocol_version().previous());
if (!lower_version.is_valid()) {
LOG_ERROR(
"Host %s does not support any valid protocol version (lowest supported version is %s)",
connector->address().to_string().c_str(),
ProtocolVersion::lowest_supported().to_string().c_str());
on_error(CLUSTER_ERROR_INVALID_PROTOCOL, "Unable to find supported protocol version");
return;
}
LOG_INFO("Host %s does not support protocol version %s. "
"Trying protocol version %s...",
connector->address().to_string().c_str(),
connector->protocol_version().to_string().c_str(), lower_version.to_string().c_str());
internal_connect(connector->address(), lower_version);
} else if (connector->is_ssl_error()) {
ssl_error_code_ = connector->ssl_error_code();
on_error(CLUSTER_ERROR_SSL_ERROR, connector->error_message());
} else if (connector->is_auth_error()) {
on_error(CLUSTER_ERROR_AUTH_ERROR, connector->error_message());
} else {
assert(!connector->is_canceled() &&
"The control connector should have an error and not be canceled");
on_error(CLUSTER_ERROR_NO_HOSTS_AVAILABLE, connector->error_message());
}
}