blob: 33eb0370e6206a7ef6055ae34e2b7486dca7f9ad [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_CLUSTER_HPP
#define DATASTAX_INTERNAL_CLUSTER_HPP
#include "config.hpp"
#include "control_connector.hpp"
#include "event_loop.hpp"
#include "external.hpp"
#include "metadata.hpp"
#include "monitor_reporting.hpp"
#include "prepare_host_handler.hpp"
#include "prepared.hpp"
#include <uv.h>
namespace datastax { namespace internal { namespace core {
class Cluster;
class LockedHostMap {
public:
typedef HostMap::iterator iterator;
typedef HostMap::const_iterator const_iterator;
LockedHostMap(const HostMap& hosts);
~LockedHostMap();
operator const HostMap&() const { return hosts_; }
const_iterator begin() const { return hosts_.begin(); }
const_iterator end() const { return hosts_.end(); }
const_iterator find(const Address& address) const;
Host::Ptr get(const Address& address) const;
void erase(const Address& address);
Host::Ptr& operator[](const Address& address);
LockedHostMap& operator=(const HostMap& hosts);
private:
mutable uv_mutex_t mutex_;
HostMap hosts_;
};
/**
* A listener that handles token map updates.
*/
class TokenMapListener {
public:
virtual ~TokenMapListener() {}
/**
* A callback that's called when the token map has changed. This happens as
* a result of the token map being rebuilt which can happen if keyspace metadata
* has changed or if node is added/removed from a cluster.
*
* @param token_map The updated token map.
*/
virtual void on_token_map_updated(const TokenMap::Ptr& token_map) = 0;
};
/**
* A listener that handles cluster events.
*/
class ClusterListener
: public HostListener
, public TokenMapListener {
public:
typedef Vector<ClusterListener*> Vec;
virtual ~ClusterListener() {}
/**
* A callback that's called when the control connection receives an up event.
* It means that the host might be available to handle queries, but not
* necessarily.
*
* @param host A host that may be available.
*/
virtual void on_host_maybe_up(const Host::Ptr& host) {}
/**
* A callback that's called as the result of `Cluster::notify_host_up()`.
* It's *always* called for a valid (not ignored) host that's ready to
* receive queries. The ready state means the host has had any previously
* prepared queries setup on the newly available server. If the host was
* previously ready the callback is just called.
*
* @param host A host that's ready to receive queries.
*/
virtual void on_host_ready(const Host::Ptr& host) {}
/**
* A callback that's called when the cluster connects or reconnects to a host.
*
* Note: This is mostly for testing.
*
* @param cluster The cluster object.
*/
virtual void on_reconnect(Cluster* cluster) {}
/**
* A callback that's called when the cluster has closed.
*
* @param cluster The cluster object.
*/
virtual void on_close(Cluster* cluster) = 0;
};
/**
* A class for recording host and token map events so they can be replayed.
*/
struct ClusterEvent {
typedef Vector<ClusterEvent> Vec;
enum Type {
HOST_UP,
HOST_DOWN,
HOST_ADD,
HOST_REMOVE,
HOST_MAYBE_UP,
HOST_READY,
TOKEN_MAP_UPDATE
};
ClusterEvent(Type type, const Host::Ptr& host)
: type(type)
, host(host) {}
ClusterEvent(const TokenMap::Ptr& token_map)
: type(TOKEN_MAP_UPDATE)
, token_map(token_map) {}
static void process_event(const ClusterEvent& event, ClusterListener* listener);
static void process_events(const Vec& events, ClusterListener* listener);
Type type;
Host::Ptr host;
TokenMap::Ptr token_map;
};
/**
* Cluster settings.
*/
struct ClusterSettings {
/**
* Constructor. Initialize with default settings.
*/
ClusterSettings();
/**
* Constructor. Initialize with a config object.
*
* @param config The config object.
*/
ClusterSettings(const Config& config);
/**
* The settings for the underlying control connection.
*/
ControlConnectionSettings control_connection_settings;
/**
* The load balancing policy to use for reconnecting the control
* connection.
*/
LoadBalancingPolicy::Ptr load_balancing_policy;
/**
* Load balancing policies for all profiles.
*/
LoadBalancingPolicy::Vec load_balancing_policies;
/**
* The port to use for the contact points. This setting is spread to
* the other hosts using the contact point hosts.
*/
int port;
/**
* Reconnection policy to use when attempting to reconnect the control connection.
*/
ReconnectionPolicy::Ptr reconnection_policy;
/**
* If true then cached prepared statements are prepared when a host is brought
* up or is added.
*/
bool prepare_on_up_or_add_host;
/**
* Max number of requests to be written out to the socket per write system call.
*/
unsigned max_prepares_per_flush;
/**
* If true then events are disabled on startup. Events can be explicitly
* started by calling `Cluster::start_events()`.
*/
bool disable_events_on_startup;
/**
* A factory for creating cluster metadata resolvers. A cluster metadata resolver is used to
* determine contact points and retrieve other metadata required to connect the
* cluster.
*/
ClusterMetadataResolverFactory::Ptr cluster_metadata_resolver_factory;
};
/**
* A cluster connection. This wraps and maintains a control connection to a
* cluster. If a host in the cluster fails then it re-establishes a new control
* connection to a different host. A cluster will never close without an
* explicit call to close because it repeatedly tries to re-establish its
* connection even if no hosts are available.
*/
class Cluster
: public RefCounted<Cluster>
, public ControlConnectionListener {
public:
typedef SharedRefPtr<Cluster> Ptr;
/**
* Constructor. Don't use directly.
*
* @param connection The current control connection.
* @param listener A listener to handle cluster events.
* @param event_loop The event loop.
* @param connected_host The currently connected host.
* @param hosts Available hosts for the cluster (based on load balancing
* policies).
* @param schema Current schema metadata.
* @param load_balancing_policy The default load balancing policy to use for
* determining the next control connection host.
* @param load_balancing_policies
* @param local_dc The local datacenter determined by the metadata service for initializing the
* load balancing policies.
* @param supported_options Supported options discovered during control connection.
* @param settings The control connection settings to use for reconnecting the
* control connection.
*/
Cluster(const ControlConnection::Ptr& connection, ClusterListener* listener,
EventLoop* event_loop, const Host::Ptr& connected_host, const HostMap& hosts,
const ControlConnectionSchema& schema,
const LoadBalancingPolicy::Ptr& load_balancing_policy,
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
const StringMultimap& supported_options, const ClusterSettings& settings);
/**
* Set the listener that will handle events for the cluster
* (*NOT* thread-safe).
*
* @param listener The cluster listener.
*/
void set_listener(ClusterListener* listener = NULL);
/**
* Close the current connection and stop the re-connection process (thread-safe).
*/
void close();
/**
* Notify that a node has been determined to be available via an external
* source (thread-safe).
*
* @param address The address of the host that is now available.
*/
void notify_host_up(const Address& address);
/**
* Notify that a node has been determined to be down via an external source.
* DOWN events from the control connection are ignored so it is up to other
* sources to determine a host is unavailable (thread-safe).
*
* @param address That address of the host that is now unavailable.
*/
void notify_host_down(const Address& address);
/**
* Start host and token map events. Events that occurred during startup will be
* replayed (thread-safe).
*/
void start_events();
/**
* Start the client monitor events (thread-safe).
*
* @param client_id Client ID associated with the session.
* @param session_id Session ID associated with the session.
* @param config The config object.
*/
void start_monitor_reporting(const String& client_id, const String& session_id,
const Config& config);
/**
* Get the latest snapshot of the schema metadata (thread-safe).
*
* @return A schema metadata snapshot.
*/
Metadata::SchemaSnapshot schema_snapshot();
/**
* Look up a host by address (thread-safe).
*
* @param address The address of the host.
* @return The host object for the specified address or a null object pointer
* if the host doesn't exist.
*/
Host::Ptr find_host(const Address& address) const;
/**
* Get a prepared metadata entry for a prepared ID (thread-safe).
*
* @param id A prepared ID
* @return The prepare metadata object for the specified ID or a null object
* pointer if the entry doesn't exist.
*/
PreparedMetadata::Entry::Ptr prepared(const String& id) const;
/**
* Set the prepared metadata for a given prepared ID (thread-safe).
*
* @param id A prepared ID.
* @param entry A prepared metadata entry.
*/
void prepared(const String& id, const PreparedMetadata::Entry::Ptr& entry);
/**
* Get available hosts (determined by host distance). This filters out ignored
* hosts (*NOT* thread-safe).
*
* @return A mapping of available hosts.
*/
HostMap available_hosts() const;
public:
ProtocolVersion protocol_version() const { return connection_->protocol_version(); }
const Host::Ptr& connected_host() const { return connected_host_; }
const TokenMap::Ptr& token_map() const { return token_map_; }
const String& local_dc() const { return local_dc_; }
const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); }
const StringMultimap& supported_options() const { return supported_options_; }
private:
friend class ClusterRunClose;
friend class ClusterNotifyUp;
friend class ClusterNotifyDown;
friend class ClusterStartEvents;
friend class ClusterStartClientMonitor;
private:
void update_hosts(const HostMap& hosts);
void update_schema(const ControlConnectionSchema& schema);
void update_token_map(const HostMap& hosts, const String& partitioner,
const ControlConnectionSchema& schema);
bool is_host_ignored(const Host::Ptr& host) const;
void schedule_reconnect();
void on_schedule_reconnect(Timer* timer);
void handle_schedule_reconnect();
void on_reconnect(ControlConnector* connector);
private:
void internal_close();
void handle_close();
void internal_notify_host_up(const Address& address);
void notify_host_up_after_prepare(const Host::Ptr& host);
void internal_notify_host_down(const Address& address);
void internal_start_events();
void internal_start_monitor_reporting(const String& client_id, const String& session_id,
const Config& config);
void on_monitor_reporting(Timer* timer);
void notify_host_add(const Host::Ptr& host);
void notify_host_add_after_prepare(const Host::Ptr& host);
void notify_host_remove(const Address& address);
private:
void notify_or_record(const ClusterEvent& event);
private:
bool prepare_host(const Host::Ptr& host, const PrepareHostHandler::Callback& callback);
void on_prepare_host_add(const PrepareHostHandler* handler);
void on_prepare_host_up(const PrepareHostHandler* handler);
private:
// Control connection listener methods
virtual void on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
const String& keyspace_name, const String& target_name);
virtual void on_drop_schema(SchemaType type, const String& keyspace_name,
const String& target_name);
virtual void on_up(const Address& address);
virtual void on_down(const Address& address);
virtual void on_add(const Host::Ptr& host);
virtual void on_remove(const Address& address);
virtual void on_close(ControlConnection* connection);
private:
ControlConnection::Ptr connection_;
ControlConnector::Ptr reconnector_;
ClusterListener* listener_;
EventLoop* const event_loop_;
const LoadBalancingPolicy::Ptr load_balancing_policy_;
LoadBalancingPolicy::Vec load_balancing_policies_;
const ClusterSettings settings_;
ScopedPtr<QueryPlan> query_plan_;
bool is_closing_;
Host::Ptr connected_host_;
LockedHostMap hosts_;
Metadata metadata_;
PreparedMetadata prepared_metadata_;
TokenMap::Ptr token_map_;
String local_dc_;
StringMultimap supported_options_;
Timer timer_;
bool is_recording_events_;
ClusterEvent::Vec recorded_events_;
ScopedPtr<MonitorReporting> monitor_reporting_;
Timer monitor_reporting_timer_;
ScopedPtr<ReconnectionSchedule> reconnection_schedule_;
};
}}} // namespace datastax::internal::core
#endif