blob: 3f1ce25ecbe2aead3d7ad6ebb564e1743a792829 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "control_connection.hpp"
#include "collection_iterator.hpp"
#include "constants.hpp"
#include "error_response.hpp"
#include "event_response.hpp"
#include "load_balancing.hpp"
#include "logger.hpp"
#include "metadata.hpp"
#include "query_request.hpp"
#include "result_iterator.hpp"
#include "result_response.hpp"
#include "session.hpp"
#include "timer.hpp"
#include "utils.hpp"
#include "vector.hpp"
#include <algorithm>
#include <iomanip>
#include <iterator>
using namespace datastax;
using namespace datastax::internal::core;
namespace datastax { namespace internal { namespace core {
/**
* A class for handling a single query on behalf of the control connection.
*/
class ControlRequestCallback : public SimpleRequestCallback {
public:
typedef SharedRefPtr<ControlRequestCallback> Ptr;
typedef void (*Callback)(ControlRequestCallback*);
/**
* Constructor. Initialize with a query.
*
* @param query The query to run.
* @param control_connection The control connection the query is run on.
* @param callback A callback that handles a successful query.
*/
ControlRequestCallback(const String& query, ControlConnection* control_connection,
Callback callback)
: SimpleRequestCallback(query)
, control_connection_(control_connection)
, callback_(callback) {}
/**
* Constructor. Initialize with a request object.
*
* @param request The request to run.
* @param control_connection The control connection the request is run on.
* @param callback A callback that handles a successful request.
*/
ControlRequestCallback(const Request::ConstPtr& request, ControlConnection* control_connection,
Callback callback)
: SimpleRequestCallback(request)
, control_connection_(control_connection)
, callback_(callback) {}
virtual void on_internal_set(ResponseMessage* response) {
if (response->opcode() != CQL_OPCODE_RESULT) {
control_connection_->defunct();
return;
}
result_ = ResultResponse::Ptr(response->response_body());
callback_(this);
}
virtual void on_internal_error(CassError code, const String& message) {
control_connection_->defunct();
}
virtual void on_internal_timeout() { control_connection_->defunct(); }
ControlConnection* control_connection() { return control_connection_; }
const ResultResponse::Ptr& result() const { return result_; }
private:
ControlConnection* control_connection_;
Callback callback_;
ResultResponse::Ptr result_;
};
/**
* A class for handling multiple simultaneous queries on behalf of the control
* connection.
*/
class ChainedControlRequestCallback : public ChainedRequestCallback {
public:
typedef SharedRefPtr<ChainedControlRequestCallback> Ptr;
typedef void (*Callback)(ChainedControlRequestCallback*);
/**
* Constructor.
*
* @param key The key of the first query. Used to reference the query result.
* @param query The first query.
* @param control_connection The control connection to run the queries on.
* @param callback The callback for a successful run of all the queries.
*/
ChainedControlRequestCallback(const String& key, const String& query,
ControlConnection* control_connection, Callback callback)
: ChainedRequestCallback(key, query)
, control_connection_(control_connection)
, callback_(callback) {}
virtual void on_chain_set() {
for (Map::const_iterator it = responses().begin(), end = responses().end(); it != end; ++it) {
if (it->second->opcode() != CQL_OPCODE_RESULT) {
control_connection_->defunct();
return;
}
}
callback_(this);
}
virtual void on_chain_error(CassError code, const String& message) {
control_connection_->defunct();
}
virtual void on_chain_timeout() { control_connection_->defunct(); }
ControlConnection* control_connection() { return control_connection_; }
private:
ControlConnection* control_connection_;
Callback callback_;
};
/**
* A specialized request callback for handling node queries. This is needed for
* new node and node moved events.
*/
class RefreshNodeCallback : public ControlRequestCallback {
public:
/**
* Constructor.
*
* @param address The address of the host that changed.
* @param type The type of node change.
* @param is_all_peers If true then the whole "system.peers" table is queried
* instead of querying by the peer's primary key.
* @param query The query to run for the node change.
* @param control_connection The control connection the query is run on.
*/
RefreshNodeCallback(const Address& address, ControlConnection::RefreshNodeType type,
bool is_all_peers, const String& query, ControlConnection* control_connection)
: ControlRequestCallback(query, control_connection, ControlConnection::on_refresh_node)
, address(address)
, type(type)
, is_all_peers(is_all_peers) {}
const Address address;
const ControlConnection::RefreshNodeType type;
const bool is_all_peers;
};
/**
* A specialized request callback for keyspace queries. This is needed for
* keyspace change events.
*/
class RefreshKeyspaceCallback : public ControlRequestCallback {
public:
/**
* Constructor.
*
* @param keyspace_name The name of the keyspace that changed.
* @param query The query to run for the keyspace change.
* @param control_connection The control connection the query is run on.
*/
RefreshKeyspaceCallback(const String& keyspace_name, const String& query,
ControlConnection* control_connection)
: ControlRequestCallback(query, control_connection, ControlConnection::on_refresh_keyspace)
, keyspace_name(keyspace_name) {}
const String keyspace_name;
};
/**
* A specialized request callback for table queries. This is needed for
* table change events. Table/Materialized View changes require querying
* multiple tables (tables, columns and indexes) to get all the metadata
* necessary.
*/
class RefreshTableCallback : public ChainedControlRequestCallback {
public:
/**
* Constructor.
*
* @param keyspace_name The name of the table/view's keyspace.
* @param table_or_view_name The name of the table/view that changed.
* @param key The query key of the first query.
* @param query The first query to run.
* @param control_connection The control connection to run the queries on.
*/
RefreshTableCallback(const String& keyspace_name, const String& table_or_view_name,
const String& key, const String& query,
ControlConnection* control_connection)
: ChainedControlRequestCallback(key, query, control_connection,
ControlConnection::on_refresh_table_or_view)
, keyspace_name(keyspace_name)
, table_or_view_name(table_or_view_name) {}
const String keyspace_name;
const String table_or_view_name;
};
/**
* A specialized request callback for user type queries. This is needed for
* user type change events.
*/
class RefreshTypeCallback : public ControlRequestCallback {
public:
/**
* Constructor.
*
* @param keyspace_name The name of the type's keyspace.
* @param type_name The name of the type that changed.
* @param query The query to run for the type change.
* @param control_connection The control connection to run the query on.
*/
RefreshTypeCallback(const String& keyspace_name, const String& type_name, const String& query,
ControlConnection* control_connection)
: ControlRequestCallback(query, control_connection, ControlConnection::on_refresh_type)
, keyspace_name(keyspace_name)
, type_name(type_name) {}
const String keyspace_name;
const String type_name;
};
/**
* A specialized request callback for function queries. This is needed for
* function change events.
*/
class RefreshFunctionCallback : public ControlRequestCallback {
public:
typedef Vector<String> StringVec;
/**
* Constructor.
*
* @param keyspace_name The name of the function/aggregate's keyspace.
* @param function_name The name of the function/aggregate that changed.
* @param arg_types The function/aggregate's argument types.
* @param is_aggregate True if the function is an aggregate.
* @param request The request to run for the function/aggregate change.
* @param control_connection The control connection to run the query on.
*/
RefreshFunctionCallback(const String& keyspace_name, const String& function_name,
const StringVec& arg_types, bool is_aggregate,
const Request::ConstPtr& request, ControlConnection* control_connection)
: ControlRequestCallback(request, control_connection, ControlConnection::on_refresh_function)
, keyspace_name(keyspace_name)
, function_name(function_name)
, arg_types(arg_types)
, is_aggregate(is_aggregate) {}
const String keyspace_name;
const String function_name;
const StringVec arg_types;
const bool is_aggregate;
};
/**
* A no operation control connection listener. This is used if no listener
* is set.
*/
class NopControlConnectionListener : public ControlConnectionListener {
public:
virtual void on_up(const Address& address) {}
virtual void on_down(const Address& address) {}
virtual void on_add(const Host::Ptr& host) {}
virtual void on_remove(const Address& address) {}
virtual void on_update_schema(SchemaType type, const ResultResponse::Ptr& result,
const String& keyspace_name, const String& target_name) {}
virtual void on_drop_schema(SchemaType type, const String& keyspace_name,
const String& target_name) {}
virtual void on_close(ControlConnection* connection) {}
};
}}} // namespace datastax::internal::core
static NopControlConnectionListener nop_listener__;
ControlConnectionSettings::ControlConnectionSettings()
: use_schema(CASS_DEFAULT_USE_SCHEMA)
, use_token_aware_routing(CASS_DEFAULT_USE_TOKEN_AWARE_ROUTING)
, address_factory(new AddressFactory()) {}
ControlConnectionSettings::ControlConnectionSettings(const Config& config)
: connection_settings(config)
, use_schema(config.use_schema())
, use_token_aware_routing(config.token_aware_routing())
, address_factory(create_address_factory_from_config(config)) {}
ControlConnector::ControlConnector(const Host::Ptr& host, ProtocolVersion protocol_version,
const Callback& callback)
: connector_(
new Connector(host, protocol_version, bind_callback(&ControlConnector::on_connect, this)))
, callback_(callback)
, error_code_(CONTROL_CONNECTION_OK)
, listener_(NULL)
, metrics_(NULL) {}
ControlConnection::ControlConnection(const Connection::Ptr& connection,
ControlConnectionListener* listener,
const ControlConnectionSettings& settings,
const VersionNumber& server_version,
const VersionNumber& dse_server_version,
ListenAddressMap listen_addresses)
: connection_(connection)
, settings_(settings)
, server_version_(server_version)
, dse_server_version_(dse_server_version)
, listen_addresses_(listen_addresses)
, listener_(listener ? listener : &nop_listener__) {
connection_->set_listener(this);
inc_ref();
}
int32_t ControlConnection::write_and_flush(const RequestCallback::Ptr& callback) {
// Update the current time of the event loop because processing the token map
// and schema metadata could take a bit of time.
uv_update_time(connection_->loop());
return connection_->write_and_flush(callback);
}
void ControlConnection::close() { connection_->close(); }
void ControlConnection::defunct() { connection_->defunct(); }
void ControlConnection::set_listener(ControlConnectionListener* listener) {
listener_ = listener ? listener : &nop_listener__;
}
void ControlConnection::refresh_node(RefreshNodeType type, const Address& address) {
bool is_connected_host = connection_->host()->rpc_address().equals(address, false);
String query;
bool is_all_peers = false;
String listen_address(listen_addresses_[address]);
if (is_connected_host) {
query.assign(SELECT_LOCAL);
} else if (!listen_address.empty()) {
query.assign(SELECT_PEERS);
query.append(" WHERE peer = '");
query.append(listen_address);
query.append("'");
} else {
is_all_peers = true;
query.assign(SELECT_PEERS);
}
LOG_DEBUG("Refresh node: %s", query.c_str());
RequestCallback::Ptr callback(new RefreshNodeCallback(address, type, is_all_peers, query, this));
if (write_and_flush(callback) < 0) {
LOG_ERROR("No more stream available while attempting to refresh node info");
defunct();
}
}
void ControlConnection::on_refresh_node(ControlRequestCallback* callback) {
RefreshNodeCallback* refresh_callback = static_cast<RefreshNodeCallback*>(callback);
refresh_callback->control_connection()->handle_refresh_node(refresh_callback);
}
void ControlConnection::handle_refresh_node(RefreshNodeCallback* callback) {
bool found_host = false;
const Row* row = NULL;
ResultIterator rows(callback->result().get());
if (callback->is_all_peers) {
while (!found_host && rows.next()) {
row = rows.row();
if (settings_.address_factory->is_peer(row, connection_->host(), callback->address)) {
found_host = true;
}
}
} else if (rows.next()) {
row = rows.row();
found_host = true;
}
if (!found_host) {
String address_str = callback->address.to_string();
LOG_ERROR("No row found for host %s in %s's peers system table. "
"%s will be ignored.",
address_str.c_str(), address_string().c_str(), address_str.c_str());
return;
}
Address address;
if (settings_.address_factory->create(row, connection_->host(), &address)) {
Host::Ptr host(new Host(address));
host->set(row, settings_.use_token_aware_routing);
listen_addresses_[host->rpc_address()] = determine_listen_address(address, row);
switch (callback->type) {
case NEW_NODE:
listener_->on_add(host);
break;
case MOVED_NODE:
listener_->on_remove(host->address());
listener_->on_add(host);
break;
}
}
}
void ControlConnection::refresh_keyspace(const StringRef& keyspace_name) {
String query;
if (server_version_ >= VersionNumber(3, 0, 0)) {
query.assign(SELECT_KEYSPACES_30);
} else {
query.assign(SELECT_KEYSPACES_20);
}
query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("'");
LOG_DEBUG("Refreshing keyspace %s", query.c_str());
RequestCallback::Ptr callback(
new RefreshKeyspaceCallback(keyspace_name.to_string(), query, this));
if (write_and_flush(callback) < 0) {
LOG_ERROR("No more stream available while attempting to refresh keyspace info");
defunct();
}
}
void ControlConnection::on_refresh_keyspace(ControlRequestCallback* callback) {
RefreshKeyspaceCallback* refresh_callback = static_cast<RefreshKeyspaceCallback*>(callback);
refresh_callback->control_connection()->handle_refresh_keyspace(refresh_callback);
}
void ControlConnection::handle_refresh_keyspace(RefreshKeyspaceCallback* callback) {
const ResultResponse::Ptr result = callback->result();
if (result->row_count() == 0) {
LOG_ERROR("No row found for keyspace %s in system schema table.",
callback->keyspace_name.c_str());
return;
}
listener_->on_update_schema(ControlConnectionListener::KEYSPACE, result, callback->keyspace_name);
}
void ControlConnection::refresh_table_or_view(const StringRef& keyspace_name,
const StringRef& table_or_view_name) {
String table_query;
String view_query;
String column_query;
String index_query;
if (server_version_ >= VersionNumber(3, 0, 0)) {
table_query.assign(SELECT_TABLES_30);
table_query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='")
.append(table_or_view_name.data(), table_or_view_name.size())
.append("'");
view_query.assign(SELECT_VIEWS_30);
view_query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND view_name='")
.append(table_or_view_name.data(), table_or_view_name.size())
.append("'");
column_query.assign(SELECT_COLUMNS_30);
column_query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='")
.append(table_or_view_name.data(), table_or_view_name.size())
.append("'");
index_query.assign(SELECT_INDEXES_30);
index_query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND table_name='")
.append(table_or_view_name.data(), table_or_view_name.size())
.append("'");
LOG_DEBUG("Refreshing table/view %s; %s; %s; %s", table_query.c_str(), view_query.c_str(),
column_query.c_str(), index_query.c_str());
} else {
table_query.assign(SELECT_COLUMN_FAMILIES_20);
table_query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='")
.append(table_or_view_name.data(), table_or_view_name.size())
.append("'");
column_query.assign(SELECT_COLUMNS_20);
column_query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND columnfamily_name='")
.append(table_or_view_name.data(), table_or_view_name.size())
.append("'");
LOG_DEBUG("Refreshing table %s; %s", table_query.c_str(), column_query.c_str());
}
ChainedRequestCallback::Ptr callback(new RefreshTableCallback(
keyspace_name.to_string(), table_or_view_name.to_string(), "tables", table_query, this));
callback = callback->chain("columns", column_query);
if (!view_query.empty()) {
callback = callback->chain("views", view_query);
}
if (!index_query.empty()) {
callback = callback->chain("indexes", index_query);
}
if (write_and_flush(callback) < 0) {
LOG_ERROR("No more stream available while attempting to refresh table/view info");
defunct();
}
}
void ControlConnection::on_refresh_table_or_view(ChainedControlRequestCallback* callback) {
RefreshTableCallback* refresh_callback = static_cast<RefreshTableCallback*>(callback);
refresh_callback->control_connection()->handle_refresh_table_or_view(refresh_callback);
}
void ControlConnection::handle_refresh_table_or_view(RefreshTableCallback* callback) {
ResultResponse::Ptr tables_result(callback->result("tables"));
if (!tables_result || tables_result->row_count() == 0) {
ResultResponse::Ptr views_result(callback->result("views"));
if (!views_result || views_result->row_count() == 0) {
LOG_ERROR("No row found for table (or view) %s.%s in system schema tables.",
callback->keyspace_name.c_str(), callback->table_or_view_name.c_str());
return;
}
listener_->on_update_schema(ControlConnectionListener::VIEW, views_result,
callback->keyspace_name, callback->table_or_view_name);
} else {
listener_->on_update_schema(ControlConnectionListener::TABLE, tables_result,
callback->keyspace_name, callback->table_or_view_name);
}
ResultResponse::Ptr columns_result(callback->result("columns"));
if (columns_result) {
listener_->on_update_schema(ControlConnectionListener::COLUMN, columns_result,
callback->keyspace_name, callback->table_or_view_name);
}
ResultResponse::Ptr indexes_result(callback->result("indexes"));
if (indexes_result) {
listener_->on_update_schema(ControlConnectionListener::INDEX, indexes_result,
callback->keyspace_name, callback->table_or_view_name);
}
}
void ControlConnection::refresh_type(const StringRef& keyspace_name, const StringRef& type_name) {
String query;
if (server_version_ >= VersionNumber(3, 0, 0)) {
query.assign(SELECT_USERTYPES_30);
} else {
query.assign(SELECT_USERTYPES_21);
}
query.append(" WHERE keyspace_name='")
.append(keyspace_name.data(), keyspace_name.size())
.append("' AND type_name='")
.append(type_name.data(), type_name.size())
.append("'");
LOG_DEBUG("Refreshing type %s", query.c_str());
RequestCallback::Ptr callback(
new RefreshTypeCallback(keyspace_name.to_string(), type_name.to_string(), query, this));
if (write_and_flush(callback) < 0) {
LOG_ERROR("No more stream available while attempting to refresh type info");
defunct();
}
}
void ControlConnection::on_refresh_type(ControlRequestCallback* callback) {
RefreshTypeCallback* refresh_callback = static_cast<RefreshTypeCallback*>(callback);
refresh_callback->control_connection()->handle_refresh_type(refresh_callback);
}
void ControlConnection::handle_refresh_type(RefreshTypeCallback* callback) {
const ResultResponse::Ptr result = callback->result();
if (result->row_count() == 0) {
LOG_ERROR("No row found for keyspace %s and type %s in system schema.",
callback->keyspace_name.c_str(), callback->type_name.c_str());
return;
}
listener_->on_update_schema(ControlConnectionListener::USER_TYPE, result, callback->keyspace_name,
callback->type_name);
}
void ControlConnection::refresh_function(const StringRef& keyspace_name,
const StringRef& function_name,
const StringRefVec& arg_types, bool is_aggregate) {
String query;
if (server_version_ >= VersionNumber(3, 0, 0)) {
if (is_aggregate) {
query.assign(SELECT_AGGREGATES_30);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND argument_types=?");
} else {
query.assign(SELECT_FUNCTIONS_30);
query.append(" WHERE keyspace_name=? AND function_name=? AND argument_types=?");
}
} else {
if (is_aggregate) {
query.assign(SELECT_AGGREGATES_22);
query.append(" WHERE keyspace_name=? AND aggregate_name=? AND signature=?");
} else {
query.assign(SELECT_FUNCTIONS_22);
query.append(" WHERE keyspace_name=? AND function_name=? AND signature=?");
}
}
LOG_DEBUG("Refreshing %s %s in keyspace %s", is_aggregate ? "aggregate" : "function",
Metadata::full_function_name(function_name.to_string(), to_strings(arg_types)).c_str(),
String(keyspace_name.data(), keyspace_name.length()).c_str());
SharedRefPtr<QueryRequest> request(new QueryRequest(query, 3));
SharedRefPtr<Collection> signature(new Collection(CASS_COLLECTION_TYPE_LIST, arg_types.size()));
for (StringRefVec::const_iterator i = arg_types.begin(), end = arg_types.end(); i != end; ++i) {
signature->append(CassString(i->data(), i->size()));
}
request->set(0, CassString(keyspace_name.data(), keyspace_name.size()));
request->set(1, CassString(function_name.data(), function_name.size()));
request->set(2, signature.get());
RequestCallback::Ptr callback(
new RefreshFunctionCallback(keyspace_name.to_string(), function_name.to_string(),
to_strings(arg_types), is_aggregate, request, this));
if (write_and_flush(callback) < 0) {
LOG_ERROR("No more stream available while attempting to refresh function info");
defunct();
}
}
void ControlConnection::on_refresh_function(ControlRequestCallback* callback) {
RefreshFunctionCallback* refresh_callback = static_cast<RefreshFunctionCallback*>(callback);
refresh_callback->control_connection()->handle_refresh_function(refresh_callback);
}
void ControlConnection::handle_refresh_function(RefreshFunctionCallback* callback) {
const ResultResponse::Ptr result = callback->result();
if (result->row_count() == 0) {
LOG_ERROR("No row found for keyspace %s and %s %s", callback->keyspace_name.c_str(),
callback->is_aggregate ? "aggregate" : "function",
Metadata::full_function_name(callback->function_name, callback->arg_types).c_str());
return;
}
listener_->on_update_schema(
callback->is_aggregate ? ControlConnectionListener::AGGREGATE
: ControlConnectionListener::FUNCTION,
result, callback->keyspace_name,
Metadata::full_function_name(callback->function_name, callback->arg_types));
}
void ControlConnection::on_close(Connection* connection) {
listener_->on_close(this);
dec_ref();
}
void ControlConnection::on_event(const EventResponse::Ptr& response) {
switch (response->event_type()) {
case CASS_EVENT_TOPOLOGY_CHANGE: {
String address_str = response->affected_node().to_string();
switch (response->topology_change()) {
case EventResponse::NEW_NODE: {
LOG_INFO("New node %s added event", address_str.c_str());
refresh_node(NEW_NODE, response->affected_node());
break;
}
case EventResponse::REMOVED_NODE: {
LOG_INFO("Node %s removed event", address_str.c_str());
listen_addresses_.erase(response->affected_node());
listener_->on_remove(response->affected_node());
break;
}
case EventResponse::MOVED_NODE:
LOG_INFO("Node %s moved event", address_str.c_str());
refresh_node(MOVED_NODE, response->affected_node());
break;
}
break;
}
case CASS_EVENT_STATUS_CHANGE: {
String address_str = response->affected_node().to_string();
switch (response->status_change()) {
case EventResponse::UP: {
LOG_DEBUG("Node %s is up event", address_str.c_str());
listener_->on_up(response->affected_node());
break;
}
case EventResponse::DOWN: {
LOG_DEBUG("Node %s is down event", address_str.c_str());
listener_->on_down(response->affected_node());
break;
}
}
break;
}
case CASS_EVENT_SCHEMA_CHANGE:
// Only handle keyspace events when using token-aware routing
if (!settings_.use_schema && response->schema_change_target() != EventResponse::KEYSPACE) {
return;
}
LOG_DEBUG("Schema change (%d): %.*s %.*s", response->schema_change(),
(int)response->keyspace().size(), response->keyspace().data(),
(int)response->target().size(), response->target().data());
switch (response->schema_change()) {
case EventResponse::CREATED:
case EventResponse::UPDATED:
switch (response->schema_change_target()) {
case EventResponse::KEYSPACE:
refresh_keyspace(response->keyspace());
break;
case EventResponse::TABLE:
refresh_table_or_view(response->keyspace(), response->target());
break;
case EventResponse::TYPE:
refresh_type(response->keyspace(), response->target());
break;
case EventResponse::FUNCTION:
case EventResponse::AGGREGATE:
refresh_function(response->keyspace(), response->target(), response->arg_types(),
response->schema_change_target() == EventResponse::AGGREGATE);
break;
}
break;
case EventResponse::DROPPED:
switch (response->schema_change_target()) {
case EventResponse::KEYSPACE:
listener_->on_drop_schema(ControlConnectionListener::KEYSPACE,
response->keyspace().to_string(),
response->target().to_string());
break;
case EventResponse::TABLE:
listener_->on_drop_schema(ControlConnectionListener::TABLE,
response->keyspace().to_string(),
response->target().to_string());
break;
case EventResponse::TYPE:
listener_->on_drop_schema(ControlConnectionListener::USER_TYPE,
response->keyspace().to_string(),
response->target().to_string());
break;
case EventResponse::FUNCTION:
listener_->on_drop_schema(
ControlConnectionListener::FUNCTION, response->keyspace().to_string(),
Metadata::full_function_name(response->target().to_string(),
to_strings(response->arg_types())));
break;
case EventResponse::AGGREGATE:
listener_->on_drop_schema(
ControlConnectionListener::AGGREGATE, response->keyspace().to_string(),
Metadata::full_function_name(response->target().to_string(),
to_strings(response->arg_types())));
break;
}
break;
}
break;
default:
assert(false);
break;
}
}