blob: 9018d5107f0b0ed63136b29a5db9761526ed4d53 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_CONNECTION_POOL_HPP
#define DATASTAX_INTERNAL_CONNECTION_POOL_HPP
#include "address.hpp"
#include "delayed_connector.hpp"
#include "dense_hash_map.hpp"
#include "pooled_connection.hpp"
#include "reconnection_policy.hpp"
#include <uv.h>
namespace datastax { namespace internal { namespace core {
class ConnectionPool;
class ConnectionPoolConnector;
class ConnectionPoolManager;
class EventLoop;
class ConnectionPoolStateListener {
public:
virtual ~ConnectionPoolStateListener() {}
/**
* A callback that's called when a host is up.
*
* @param address The address of the host.
*/
virtual void on_pool_up(const Address& address) = 0;
/**
* A callback that's called when a host is down.
*
* @param address The address of the host.
*/
virtual void on_pool_down(const Address& address) = 0;
/**
* A callback that's called when a host has a critical error
* during reconnection.
*
* The following are critical errors:
* * Invalid keyspace
* * Invalid protocol version
* * Authentication failure
* * SSL failure
*
* @param address The address of the host.
* @param code The code of the critical error.
* @param message The message of the critical error.
*/
virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
const String& message) = 0;
};
class ConnectionPoolListener : public ConnectionPoolStateListener {
public:
virtual void on_requires_flush(ConnectionPool* pool) {}
virtual void on_close(ConnectionPool* pool) = 0;
};
/**
* The connection pool settings.
*/
struct ConnectionPoolSettings {
/**
* Constructor. Initialize with default settings.
*/
ConnectionPoolSettings();
/**
* Constructor. Initialize the pool settings from a config object.
*
* @param config The config object.
*/
ConnectionPoolSettings(const Config& config);
ConnectionSettings connection_settings;
size_t num_connections_per_host;
ReconnectionPolicy::Ptr reconnection_policy;
};
/**
* A pool of connections to the same host.
*/
class ConnectionPool : public RefCounted<ConnectionPool> {
public:
typedef SharedRefPtr<ConnectionPool> Ptr;
typedef DenseHashMap<DelayedConnector*, ReconnectionSchedule*> ReconnectionSchedules;
class Map : public DenseHashMap<Address, Ptr> {
public:
Map() {
set_empty_key(Address::EMPTY_KEY);
set_deleted_key(Address::DELETED_KEY);
}
};
/**
* Constructor. Don't use directly.
*
* @param connections
* @param listener
* @param keyspace
* @param loop
* @param host
* @param protocol_version
* @param settings
* @param metrics
*/
ConnectionPool(const Connection::Vec& connections, ConnectionPoolListener* listener,
const String& keyspace, uv_loop_t* loop, const Host::Ptr& host,
ProtocolVersion protocol_version, const ConnectionPoolSettings& settings,
Metrics* metrics);
/**
* Find the least busy connection for the pool. The least busy connection has
* the lowest number of outstanding requests and is not closed.
*
* @return The least busy connection or null if no connection is available.
*/
PooledConnection::Ptr find_least_busy() const;
/**
* Determine if the pool has any valid connections.
*
* @return Returns true if the pool has valid connections.
*/
bool has_connections() const;
/**
* Trigger immediate connection of any delayed (reconnecting) connections.
*/
void attempt_immediate_connect();
/**
* Flush connections with pending writes.
*/
void flush();
/**
* Close the pool.
*/
void close();
/**
* Set the listener that will handle events for the pool.
*
* @param listener The pool listener.
*/
void set_listener(ConnectionPoolListener* listener = NULL);
public:
const uv_loop_t* loop() const { return loop_; }
const Address& address() const { return host_->address(); }
ProtocolVersion protocol_version() const { return protocol_version_; }
const String& keyspace() const { return keyspace_; }
void set_keyspace(const String& keyspace);
public:
class Protected {
friend class PooledConnection;
friend class ConnectionPoolConnector;
Protected() {}
Protected(Protected const&) {}
};
/**
* Remove the connection and schedule a reconnection.
*
* @param connection A connection that closed.
* @param A key to restrict access to the method.
*/
void close_connection(PooledConnection* connection, Protected);
/**
* Add a connection to be flushed.
*
* @param connection A connection with pending writes.
*/
void requires_flush(PooledConnection* connection, Protected);
private:
enum CloseState {
CLOSE_STATE_OPEN,
CLOSE_STATE_CLOSING,
CLOSE_STATE_WAITING_FOR_CONNECTIONS,
CLOSE_STATE_CLOSED
};
enum NotifyState { NOTIFY_STATE_NEW, NOTIFY_STATE_UP, NOTIFY_STATE_DOWN, NOTIFY_STATE_CRITICAL };
private:
friend class NotifyDownOnRemovePoolOp;
private:
void notify_up_or_down();
void notify_critical_error(Connector::ConnectionError code, const String& message);
void add_connection(const PooledConnection::Ptr& connection);
void schedule_reconnect(ReconnectionSchedule* schedule = NULL);
void internal_close();
void maybe_closed();
void on_reconnect(DelayedConnector* connector);
private:
ConnectionPoolListener* listener_;
String keyspace_;
uv_loop_t* const loop_;
const Host::Ptr host_;
const ProtocolVersion protocol_version_;
const ConnectionPoolSettings settings_;
Metrics* const metrics_;
ReconnectionSchedules reconnection_schedules_;
CloseState close_state_;
NotifyState notify_state_;
PooledConnection::Vec connections_;
DelayedConnector::Vec pending_connections_;
DenseHashSet<PooledConnection*> to_flush_;
};
}}} // namespace datastax::internal::core
#endif