| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #ifndef DATASTAX_INTERNAL_CONTROL_CONNECTION_HPP |
| #define DATASTAX_INTERNAL_CONTROL_CONNECTION_HPP |
| |
| #include "address.hpp" |
| #include "address_factory.hpp" |
| #include "config.hpp" |
| #include "connection.hpp" |
| #include "connector.hpp" |
| #include "dense_hash_map.hpp" |
| #include "host.hpp" |
| #include "load_balancing.hpp" |
| #include "macros.hpp" |
| #include "request_callback.hpp" |
| #include "response.hpp" |
| #include "scoped_ptr.hpp" |
| #include "token_map.hpp" |
| |
| #include <stdint.h> |
| |
| #define SELECT_LOCAL "SELECT * FROM system.local WHERE key='local'" |
| #define SELECT_PEERS "SELECT * FROM system.peers" |
| |
| #define SELECT_KEYSPACES_20 "SELECT * FROM system.schema_keyspaces" |
| #define SELECT_COLUMN_FAMILIES_20 "SELECT * FROM system.schema_columnfamilies" |
| #define SELECT_COLUMNS_20 "SELECT * FROM system.schema_columns" |
| #define SELECT_USERTYPES_21 "SELECT * FROM system.schema_usertypes" |
| #define SELECT_FUNCTIONS_22 "SELECT * FROM system.schema_functions" |
| #define SELECT_AGGREGATES_22 "SELECT * FROM system.schema_aggregates" |
| |
| #define SELECT_KEYSPACES_30 "SELECT * FROM system_schema.keyspaces" |
| #define SELECT_TABLES_30 "SELECT * FROM system_schema.tables" |
| #define SELECT_VIEWS_30 "SELECT * FROM system_schema.views" |
| #define SELECT_COLUMNS_30 "SELECT * FROM system_schema.columns" |
| #define SELECT_INDEXES_30 "SELECT * FROM system_schema.indexes" |
| #define SELECT_USERTYPES_30 "SELECT * FROM system_schema.types" |
| #define SELECT_FUNCTIONS_30 "SELECT * FROM system_schema.functions" |
| #define SELECT_AGGREGATES_30 "SELECT * FROM system_schema.aggregates" |
| |
| #define SELECT_VIRTUAL_KEYSPACES_40 "SELECT * FROM system_virtual_schema.keyspaces" |
| #define SELECT_VIRTUAL_TABLES_40 "SELECT * FROM system_virtual_schema.tables" |
| #define SELECT_VIRTUAL_COLUMNS_40 "SELECT * FROM system_virtual_schema.columns" |
| |
| namespace datastax { namespace internal { namespace core { |
| |
| class ChainedControlRequestCallback; |
| class ControlRequestCallback; |
| class ControlConnection; |
| class EventResponse; |
| class RefreshNodeCallback; |
| class RefreshKeyspaceCallback; |
| class RefreshTableCallback; |
| class RefreshTypeCallback; |
| class RefreshFunctionCallback; |
| |
| /** |
| * A listener for processing control connection events such as topology, node |
| * status, and schema changes. |
| */ |
| class ControlConnectionListener { |
| public: |
| enum SchemaType { KEYSPACE, TABLE, VIEW, COLUMN, INDEX, USER_TYPE, FUNCTION, AGGREGATE }; |
| |
| virtual ~ControlConnectionListener() {} |
| |
| /** |
| * A callback that's called when a host is marked as being UP. |
| * |
| * @param address The address of the host. |
| */ |
| virtual void on_up(const Address& address) = 0; |
| |
| /** |
| * A callback that's called when a host is marked as being DOWN. |
| * |
| * @param address The address of the host. |
| */ |
| virtual void on_down(const Address& address) = 0; |
| |
| /** |
| * A callback that's called when a new host is added to the cluster. |
| * |
| * @param host A fully populated host object. |
| */ |
| virtual void on_add(const Host::Ptr& host) = 0; |
| |
| /** |
| * A callback that's called when a host is removed from a cluster. |
| * |
| * @param address The address of the host. |
| */ |
| virtual void on_remove(const Address& address) = 0; |
| |
| /** |
| * A callback that's called when schema is created or updated. Table and |
| * materialized view changes will result in several calls to this method for |
| * the associated columns and indexes. Column and indexes are not updated |
| * without a preceding table or materialized view update. |
| * |
| * @param type The type of the schema changed. |
| * @param result A result response with the row data associated with the |
| * schema change. |
| */ |
| virtual void on_update_schema(SchemaType type, const ResultResponse::Ptr& result, |
| const String& keyspace_name, const String& target_name = "") = 0; |
| |
| /** |
| * A callback that's called when schema is dropped. |
| * |
| * @param type The type of the schema dropped. |
| * @param keyspace_name The keyspace of the schema. |
| * @param target_name The name of the table, user type, function, or |
| * aggregate dropped. This is the empty string for dropped keyspaces. |
| */ |
| virtual void on_drop_schema(SchemaType type, const String& keyspace_name, |
| const String& target_name = "") = 0; |
| |
| /** |
| * A callback that's called when the control connection is closed. |
| * |
| * @param connection The closing control connection. |
| */ |
| virtual void on_close(ControlConnection* connection) = 0; |
| }; |
| |
| /** |
| * A mapping between a host's address and it's listening address. The listening |
| * address is used to look up a peer in the "system.peers" table. |
| */ |
| class ListenAddressMap : public DenseHashMap<Address, String> { |
| public: |
| ListenAddressMap() { |
| set_empty_key(Address::EMPTY_KEY); |
| set_deleted_key(Address::DELETED_KEY); |
| } |
| }; |
| |
| /** |
| * Control connection settings. |
| */ |
| struct ControlConnectionSettings { |
| /** |
| * Constructor. Initialize with default settings. |
| */ |
| ControlConnectionSettings(); |
| |
| /** |
| * Constructor. Initialize the settings from a config object. |
| * |
| * @param config The config object. |
| */ |
| ControlConnectionSettings(const Config& config); |
| |
| /** |
| * The settings for the underlying connection. |
| */ |
| ConnectionSettings connection_settings; |
| |
| /** |
| * If true then the control connection will listen for schema events. |
| */ |
| bool use_schema; |
| |
| /** |
| * If true then the control connection will listen for keyspace schema |
| * events. This is needed for the keyspaces replication strategy. |
| */ |
| bool use_token_aware_routing; |
| |
| /** |
| * A factory for creating addresses (for the connection process). |
| */ |
| AddressFactory::Ptr address_factory; |
| }; |
| |
| /** |
| * A control connection. This is a wrapper around a connection that handles |
| * schema, node status, and topology changes. This class handles events |
| * by running queries on the control connection to get additional information |
| * then passing that data to the listener. |
| */ |
| class ControlConnection |
| : public RefCounted<ControlConnection> |
| , public ConnectionListener { |
| public: |
| typedef SharedRefPtr<ControlConnection> Ptr; |
| |
| enum RefreshNodeType { NEW_NODE, MOVED_NODE }; |
| |
| /** |
| * Constructor. Don't use directly. |
| * |
| * @param connection The wrapped connection. |
| * @param listener A listener to handle events. |
| * @param settings The control connection's settings. |
| * @param server_version The version number of the server implementation. |
| * @param dse_server_version The version number of the DSE server implementation. |
| * @param listen_addresses The current state of the listen addresses map. |
| */ |
| ControlConnection(const Connection::Ptr& connection, ControlConnectionListener* listener, |
| const ControlConnectionSettings& settings, const VersionNumber& server_version, |
| const VersionNumber& dse_server_version, ListenAddressMap listen_addresses); |
| |
| /** |
| * Write a request and flush immediately. |
| * |
| * @param The request callback to write and handle the request. |
| * @return The number of bytes written to the connection. If negative then |
| * an error occurred. |
| */ |
| int32_t write_and_flush(const RequestCallback::Ptr& callback); |
| |
| /** |
| * Close the connection. |
| */ |
| void close(); |
| |
| /** |
| * Close the connection with an error. |
| */ |
| void defunct(); |
| |
| /** |
| * Set the listener that will handle control connection events. |
| * |
| * @param listener The listener. |
| */ |
| void set_listener(ControlConnectionListener* listener = NULL); |
| |
| public: |
| const Address& address() const { return connection_->address(); } |
| |
| const String& address_string() const { return connection_->address_string(); } |
| |
| const Address& resolved_address() const { return connection_->resolved_address(); } |
| |
| ProtocolVersion protocol_version() const { return connection_->protocol_version(); } |
| |
| const VersionNumber& server_version() { return server_version_; } |
| |
| const VersionNumber& dse_server_version() { return dse_server_version_; } |
| |
| uv_loop_t* loop() { return connection_->loop(); } |
| const Connection::Ptr& connection() const { return connection_; } |
| |
| private: |
| friend class ControlConnector; |
| friend class RefreshNodeCallback; |
| friend class RefreshKeyspaceCallback; |
| friend class RefreshTableCallback; |
| friend class RefreshTypeCallback; |
| friend class RefreshFunctionCallback; |
| |
| private: |
| void refresh_node(RefreshNodeType type, const Address& address); |
| static void on_refresh_node(ControlRequestCallback* callback); |
| void handle_refresh_node(RefreshNodeCallback* callback); |
| |
| void refresh_keyspace(const StringRef& keyspace_name); |
| static void on_refresh_keyspace(ControlRequestCallback* callback); |
| void handle_refresh_keyspace(RefreshKeyspaceCallback* callback); |
| |
| void refresh_table_or_view(const StringRef& keyspace_name, const StringRef& table_or_view_name); |
| static void on_refresh_table_or_view(ChainedControlRequestCallback* callback); |
| void handle_refresh_table_or_view(RefreshTableCallback* callback); |
| |
| void refresh_type(const StringRef& keyspace_name, const StringRef& type_name); |
| static void on_refresh_type(ControlRequestCallback* callback); |
| void handle_refresh_type(RefreshTypeCallback* callback); |
| |
| void refresh_function(const StringRef& keyspace_name, const StringRef& function_name, |
| const StringRefVec& arg_types, bool is_aggregate); |
| static void on_refresh_function(ControlRequestCallback* callback); |
| void handle_refresh_function(RefreshFunctionCallback* callback); |
| |
| // Connection listener methods |
| virtual void on_close(Connection* connection); |
| virtual void on_event(const EventResponse::Ptr& response); |
| |
| private: |
| Connection::Ptr connection_; |
| ControlConnectionSettings settings_; |
| VersionNumber server_version_; |
| VersionNumber dse_server_version_; |
| ListenAddressMap listen_addresses_; |
| ControlConnectionListener* listener_; |
| }; |
| |
| }}} // namespace datastax::internal::core |
| |
| #endif |