blob: 8abcef96ad01b946514946e90feb30d5c7ef8e50 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "connection_pool_connector.hpp"
#include "event_loop.hpp"
#include "metrics.hpp"
using namespace datastax;
using namespace datastax::internal::core;
ConnectionPoolConnector::ConnectionPoolConnector(const Host::Ptr& host,
ProtocolVersion protocol_version,
const Callback& callback)
: loop_(NULL)
, callback_(callback)
, is_canceled_(false)
, remaining_(0)
, host_(host)
, protocol_version_(protocol_version)
, listener_(NULL)
, metrics_(NULL) {}
ConnectionPoolConnector* ConnectionPoolConnector::with_listener(ConnectionPoolListener* listener) {
listener_ = listener;
return this;
}
ConnectionPoolConnector* ConnectionPoolConnector::with_keyspace(const String& keyspace) {
keyspace_ = keyspace;
return this;
}
ConnectionPoolConnector* ConnectionPoolConnector::with_metrics(Metrics* metrics) {
metrics_ = metrics;
return this;
}
ConnectionPoolConnector*
ConnectionPoolConnector::with_settings(const ConnectionPoolSettings& settings) {
settings_ = settings;
return this;
}
void ConnectionPoolConnector::connect(uv_loop_t* loop) {
inc_ref();
loop_ = loop;
remaining_ = settings_.num_connections_per_host;
for (size_t i = 0; i < settings_.num_connections_per_host; ++i) {
Connector::Ptr connector(new Connector(
host_, protocol_version_, bind_callback(&ConnectionPoolConnector::on_connect, this)));
pending_connections_.push_back(connector);
connector->with_keyspace(keyspace_)
->with_metrics(metrics_)
->with_settings(settings_.connection_settings)
->connect(loop);
}
}
void ConnectionPoolConnector::cancel() {
is_canceled_ = true;
if (pool_) {
pool_->close();
} else {
for (Connector::Vec::iterator it = pending_connections_.begin(),
end = pending_connections_.end();
it != end; ++it) {
(*it)->cancel();
}
for (Connection::Vec::iterator it = connections_.begin(), end = connections_.end(); it != end;
++it) {
(*it)->close();
}
}
}
ConnectionPool::Ptr ConnectionPoolConnector::release_pool() {
ConnectionPool::Ptr temp = pool_;
pool_.reset();
return temp;
}
Connector::ConnectionError ConnectionPoolConnector::error_code() const {
return critical_error_connector_ ? critical_error_connector_->error_code()
: Connector::CONNECTION_OK;
}
String ConnectionPoolConnector::error_message() const {
return critical_error_connector_ ? critical_error_connector_->error_message() : "";
}
bool ConnectionPoolConnector::is_ok() const { return !is_critical_error(); }
bool ConnectionPoolConnector::is_critical_error() const { return critical_error_connector_; }
bool ConnectionPoolConnector::is_keyspace_error() const {
if (critical_error_connector_) {
return critical_error_connector_->is_keyspace_error();
}
return false;
}
void ConnectionPoolConnector::on_connect(Connector* connector) {
pending_connections_.erase(
std::remove(pending_connections_.begin(), pending_connections_.end(), connector),
pending_connections_.end());
if (connector->is_ok()) {
connections_.push_back(connector->release_connection());
} else if (!connector->is_canceled()) {
LOG_ERROR("Connection pool was unable to connect to host %s because of the following error: %s",
host_->address().to_string().c_str(), connector->error_message().c_str());
if (connector->is_critical_error()) {
if (!critical_error_connector_) {
critical_error_connector_.reset(connector);
for (Connector::Vec::iterator it = pending_connections_.begin(),
end = pending_connections_.end();
it != end; ++it) {
(*it)->cancel();
}
}
}
}
if (--remaining_ == 0) {
if (!is_canceled_) {
if (!critical_error_connector_) {
pool_.reset(new ConnectionPool(connections_, listener_, keyspace_, loop_, host_,
protocol_version_, settings_, metrics_));
} else {
if (listener_) {
listener_->on_pool_critical_error(host_->address(),
critical_error_connector_->error_code(),
critical_error_connector_->error_message());
}
}
}
callback_(this);
// If the pool hasn't been released then close it.
if (pool_) {
// If the callback doesn't take possession of the pool then we should
// also clear the listener.
pool_->set_listener();
pool_->close();
}
dec_ref();
}
}