blob: dcde346414dad45b60115dcf0ebbc642c74e0861 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "request_handler.hpp"
#include "batch_request.hpp"
#include "connection.hpp"
#include "connection_pool_manager.hpp"
#include "constants.hpp"
#include "error_response.hpp"
#include "execute_request.hpp"
#include "metrics.hpp"
#include "prepare_request.hpp"
#include "protocol.hpp"
#include "response.hpp"
#include "result_response.hpp"
#include "row.hpp"
#include "session.hpp"
#include <uv.h>
using namespace datastax;
using namespace datastax::internal;
using namespace datastax::internal::core;
static String to_hex(const String& byte_id) {
static const char half_byte_to_hex[] = { '0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
OStringStream ss;
const char* data = byte_id.data();
for (size_t i = 0; i < byte_id.length(); ++i) {
uint8_t byte = static_cast<uint8_t>(data[i]);
ss << half_byte_to_hex[(byte >> 4) & 0x0F];
ss << half_byte_to_hex[byte & 0x0F];
}
return ss.str();
}
class SingleHostQueryPlan : public QueryPlan {
public:
SingleHostQueryPlan(const Address& address)
: host_(new Host(address)) {}
virtual Host::Ptr compute_next() {
Host::Ptr temp = host_;
host_.reset(); // Only return the host once
return temp;
}
private:
Host::Ptr host_;
};
class PrepareCallback : public SimpleRequestCallback {
public:
PrepareCallback(const String& query, const String& id, RequestExecution* request_execution);
private:
class PrepareRequest : public core::PrepareRequest {
public:
PrepareRequest(const String& query, const String& keyspace, uint64_t request_timeout_ms)
: core::PrepareRequest(query) {
set_keyspace(keyspace);
set_request_timeout_ms(request_timeout_ms);
}
};
private:
virtual void on_internal_set(ResponseMessage* response);
virtual void on_internal_error(CassError code, const String& message);
virtual void on_internal_timeout();
private:
RequestExecution::Ptr request_execution_;
String id_;
};
PrepareCallback::PrepareCallback(const String& query, const String& id,
RequestExecution* request_execution)
: SimpleRequestCallback(
Request::ConstPtr(new PrepareRequest(query, request_execution->request()->keyspace(),
request_execution->request_timeout_ms())))
, request_execution_(request_execution)
, id_(id) {}
void PrepareCallback::on_internal_set(ResponseMessage* response) {
switch (response->opcode()) {
case CQL_OPCODE_RESULT: {
ResultResponse* result = static_cast<ResultResponse*>(response->response_body().get());
if (result->kind() == CASS_RESULT_KIND_PREPARED) {
String result_id = result->prepared_id().to_string();
if (id_ != result_id) {
request_execution_->notify_prepared_id_mismatch(id_, result_id);
} else {
request_execution_->notify_result_metadata_changed(request(), result);
request_execution_->on_retry_current_host();
}
} else {
request_execution_->on_retry_next_host();
}
} break;
case CQL_OPCODE_ERROR:
request_execution_->on_retry_next_host();
break;
default:
break;
}
}
void PrepareCallback::on_internal_error(CassError code, const String& message) {
request_execution_->on_retry_next_host();
}
void PrepareCallback::on_internal_timeout() { request_execution_->on_retry_next_host(); }
class NopRequestListener : public RequestListener {
public:
virtual void on_prepared_metadata_changed(const String& id,
const PreparedMetadata::Entry::Ptr& entry) {}
virtual void on_keyspace_changed(const String& keyspace, KeyspaceChangedResponse response) {}
virtual bool on_wait_for_tracing_data(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host,
const Response::Ptr& response) {
return false;
}
virtual bool on_wait_for_schema_agreement(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host,
const Response::Ptr& response) {
return false;
}
virtual bool on_prepare_all(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host, const Response::Ptr& response) {
return false;
}
virtual void on_done() {}
};
static NopRequestListener nop_request_listener__;
RequestHandler::RequestHandler(const Request::ConstPtr& request, const ResponseFuture::Ptr& future,
Metrics* metrics)
: wrapper_(request)
, future_(future)
, is_done_(false)
, running_executions_(0)
, start_time_ns_(uv_hrtime())
, listener_(&nop_request_listener__)
, manager_(NULL)
, metrics_(metrics) {}
RequestHandler::~RequestHandler() {
if (Logger::log_level() >= CASS_LOG_TRACE) {
OStringStream ss;
for (RequestTryVec::const_iterator it = request_tries_.begin(), end = request_tries_.end();
it != end; ++it) {
if (it != request_tries_.begin()) ss << ", ";
ss << "(" << it->address << ", ";
if (it->error != CASS_OK) {
ss << cass_error_desc(it->error);
} else {
ss << it->latency;
}
ss << ")";
}
LOG_TRACE("Speculative execution attempts: [%s]", ss.str().c_str());
}
}
void RequestHandler::set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry) {
wrapper_.set_prepared_metadata(entry);
}
void RequestHandler::init(const ExecutionProfile& profile, ConnectionPoolManager* manager,
const TokenMap* token_map, TimestampGenerator* timestamp_generator,
RequestListener* listener) {
manager_ = manager;
listener_ = listener ? listener : &nop_request_listener__;
wrapper_.init(profile, timestamp_generator);
// Attempt to use the statement's keyspace first then if not set then use the session's keyspace
const String& keyspace(!request()->keyspace().empty() ? request()->keyspace()
: manager_->keyspace());
// If a specific host is set then bypass the load balancing policy and use a
// specialized single host query plan.
if (request()->host()) {
query_plan_.reset(new SingleHostQueryPlan(*request()->host()));
} else {
query_plan_.reset(profile.load_balancing_policy()->new_query_plan(keyspace, this, token_map));
}
execution_plan_.reset(
profile.speculative_execution_policy()->new_plan(keyspace, wrapper_.request().get()));
}
void RequestHandler::execute() {
RequestExecution::Ptr request_execution(new RequestExecution(this));
running_executions_++;
internal_retry(request_execution.get());
}
void RequestHandler::retry(RequestExecution* request_execution, Protected) {
internal_retry(request_execution);
}
void RequestHandler::start_request(uv_loop_t* loop, Protected) {
if (!timer_.is_running()) {
uint64_t request_timeout_ms = wrapper_.request_timeout_ms();
if (request_timeout_ms > 0) { // 0 means no timeout
timer_.start(loop, request_timeout_ms, bind_callback(&RequestHandler::on_timeout, this));
}
}
}
Host::Ptr RequestHandler::next_host(Protected) { return query_plan_->compute_next(); }
int64_t RequestHandler::next_execution(const Host::Ptr& current_host, Protected) {
return execution_plan_->next_execution(current_host);
}
void RequestHandler::add_attempted_address(const Address& address, Protected) {
future_->add_attempted_address(address);
}
void RequestHandler::notify_result_metadata_changed(const String& prepared_id, const String& query,
const String& keyspace,
const String& result_metadata_id,
const ResultResponse::ConstPtr& result_response,
Protected) {
PreparedMetadata::Entry::Ptr entry(
new PreparedMetadata::Entry(query, keyspace, result_metadata_id, result_response));
listener_->on_prepared_metadata_changed(prepared_id, entry);
}
void RequestHandler::notify_keyspace_changed(const String& keyspace, const Host::Ptr& current_host,
const Response::Ptr& response) {
listener_->on_keyspace_changed(
keyspace, KeyspaceChangedResponse(RequestHandler::Ptr(this), current_host, response));
}
bool RequestHandler::wait_for_tracing_data(const Host::Ptr& current_host,
const Response::Ptr& response) {
return listener_->on_wait_for_tracing_data(Ptr(this), current_host, response);
}
bool RequestHandler::wait_for_schema_agreement(const Host::Ptr& current_host,
const Response::Ptr& response) {
return listener_->on_wait_for_schema_agreement(Ptr(this), current_host, response);
}
bool RequestHandler::prepare_all(const Host::Ptr& current_host, const Response::Ptr& response) {
return listener_->on_prepare_all(Ptr(this), current_host, response);
}
void RequestHandler::set_response(const Host::Ptr& host, const Response::Ptr& response) {
stop_request();
running_executions_--;
if (future_->set_response(host->address(), response)) {
if (metrics_) {
metrics_->record_request(uv_hrtime() - start_time_ns_);
}
} else {
// This request is a speculative execution for whom we already processed
// a response (another speculative execution). So consider this one an
// aborted speculative execution.
if (metrics_) {
metrics_->record_speculative_request(uv_hrtime() - start_time_ns_);
}
}
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), uv_hrtime() - start_time_ns_));
}
}
void RequestHandler::set_error(CassError code, const String& message) {
stop_request();
bool skip = (code == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE && --running_executions_ > 0);
if (!skip) {
future_->set_error(code, message);
}
}
void RequestHandler::set_error(const Host::Ptr& host, CassError code, const String& message) {
stop_request();
bool skip = (code == CASS_ERROR_LIB_NO_HOSTS_AVAILABLE && --running_executions_ > 0);
if (!skip) {
if (host) {
future_->set_error_with_address(host->address(), code, message);
} else {
set_error(code, message);
}
}
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), code));
}
}
void RequestHandler::set_error_with_error_response(const Host::Ptr& host,
const Response::Ptr& error, CassError code,
const String& message) {
stop_request();
running_executions_--;
future_->set_error_with_response(host->address(), error, code, message);
if (Logger::log_level() >= CASS_LOG_TRACE) {
request_tries_.push_back(RequestTry(host->address(), code));
}
}
void RequestHandler::stop_timer() { timer_.stop(); }
void RequestHandler::on_timeout(Timer* timer) {
if (metrics_) {
metrics_->request_timeouts.inc();
}
set_error(CASS_ERROR_LIB_REQUEST_TIMED_OUT, "Request timed out");
LOG_DEBUG("Request timed out");
}
void RequestHandler::stop_request() {
if (!is_done_) {
listener_->on_done();
is_done_ = true;
}
timer_.stop();
}
void RequestHandler::internal_retry(RequestExecution* request_execution) {
if (is_done_) {
LOG_DEBUG("Canceling speculative execution (%p) for request (%p) on host %s",
static_cast<void*>(request_execution), static_cast<void*>(this),
request_execution->current_host()
? request_execution->current_host()->address_string().c_str()
: "<no current host>");
return;
}
bool is_done = false;
while (!is_done && request_execution->current_host()) {
PooledConnection::Ptr connection =
manager_->find_least_busy(request_execution->current_host()->address());
if (connection) {
int32_t result = connection->write(request_execution);
if (result > 0) {
is_done = true;
} else {
switch (result) {
case SocketRequest::SOCKET_REQUEST_ERROR_CLOSED:
// This should never happen, but retry with next host if it does.
request_execution->next_host();
break;
case SocketRequest::SOCKET_REQUEST_ERROR_NO_HANDLER:
set_error(CASS_ERROR_LIB_WRITE_ERROR,
"Socket is not properly configured with a handler");
is_done = true;
break;
case Request::REQUEST_ERROR_NO_AVAILABLE_STREAM_IDS:
// Retry with next host
request_execution->next_host();
break;
case Request::REQUEST_ERROR_BATCH_WITH_NAMED_VALUES:
case Request::REQUEST_ERROR_PARAMETER_UNSET:
case Request::REQUEST_ERROR_UNSUPPORTED_PROTOCOL:
case Request::REQUEST_ERROR_NO_DATA_WRITTEN:
// Already handled with a specific error.
is_done = true;
break;
default:
set_error(CASS_ERROR_LIB_WRITE_ERROR, "Unspecified write error occurred");
is_done = true;
break;
}
}
} else {
// No connection available on the current host, move to the next host.
request_execution->next_host();
}
}
if (!request_execution->current_host()) {
set_error(CASS_ERROR_LIB_NO_HOSTS_AVAILABLE, "All hosts in current policy attempted "
"and were either unavailable or failed");
}
}
RequestExecution::RequestExecution(RequestHandler* request_handler)
: RequestCallback(request_handler->wrapper())
, request_handler_(request_handler)
, current_host_(request_handler->next_host(RequestHandler::Protected()))
, num_retries_(0)
, start_time_ns_(uv_hrtime()) {}
void RequestExecution::on_execute_next(Timer* timer) { request_handler_->execute(); }
void RequestExecution::on_retry_current_host() { retry_current_host(); }
void RequestExecution::on_retry_next_host() {
if (current_host_) current_host_->decrement_inflight_requests();
retry_next_host();
}
void RequestExecution::retry_current_host() {
// Reset the request so it can be executed again
set_state(REQUEST_STATE_NEW);
request_handler_->retry(this, RequestHandler::Protected());
}
void RequestExecution::retry_next_host() {
next_host();
retry_current_host();
}
void RequestExecution::on_write(Connection* connection) {
assert(current_host_ && "Tried to start on a non-existent host");
current_host_->increment_inflight_requests();
connection_ = connection;
if (request()->record_attempted_addresses()) {
request_handler_->add_attempted_address(current_host_->address(), RequestHandler::Protected());
}
request_handler_->start_request(connection->loop(), RequestHandler::Protected());
if (request()->is_idempotent()) {
int64_t timeout = request_handler_->next_execution(current_host_, RequestHandler::Protected());
if (timeout == 0) {
request_handler_->execute();
} else if (timeout > 0) {
schedule_timer_.start(connection->loop(), timeout,
bind_callback(&RequestExecution::on_execute_next, this));
}
}
}
void RequestExecution::on_set(ResponseMessage* response) {
assert(connection_ != NULL);
assert(current_host_ && "Tried to set on a non-existent host");
current_host_->decrement_inflight_requests();
Connection* connection = connection_;
switch (response->opcode()) {
case CQL_OPCODE_RESULT:
on_result_response(connection, response);
break;
case CQL_OPCODE_ERROR:
on_error_response(connection, response);
break;
default:
connection->defunct();
set_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE, "Unexpected response");
break;
}
}
void RequestExecution::on_error(CassError code, const String& message) {
if (current_host_) current_host_->decrement_inflight_requests();
set_error(code, message);
}
void RequestExecution::notify_result_metadata_changed(const Request* request,
ResultResponse* result_response) {
// Attempt to use the per-query keyspace first (v5+/DSEv2+ only) then
// the keyspace in the result metadata.
String keyspace;
if (result_response->protocol_version().supports_set_keyspace() && !request->keyspace().empty()) {
keyspace = request->keyspace();
} else {
keyspace = result_response->quoted_keyspace();
}
if (request->opcode() == CQL_OPCODE_EXECUTE && result_response->kind() == CASS_RESULT_KIND_ROWS) {
const ExecuteRequest* execute = static_cast<const ExecuteRequest*>(request);
request_handler_->notify_result_metadata_changed(
execute->prepared()->id(), execute->prepared()->query(), keyspace,
result_response->new_metadata_id().to_string(), ResultResponse::ConstPtr(result_response),
RequestHandler::Protected());
} else if (request->opcode() == CQL_OPCODE_PREPARE &&
result_response->kind() == CASS_RESULT_KIND_PREPARED) {
const PrepareRequest* prepare = static_cast<const PrepareRequest*>(request);
request_handler_->notify_result_metadata_changed(
result_response->prepared_id().to_string(), prepare->query(), keyspace,
result_response->result_metadata_id().to_string(),
ResultResponse::ConstPtr(result_response), RequestHandler::Protected());
} else {
assert(false && "Invalid response type for a result metadata change");
}
}
void RequestExecution::notify_prepared_id_mismatch(const String& expected_id,
const String& received_id) {
OStringStream ss;
ss << "ID mismatch while trying to prepare query (expected ID " << to_hex(expected_id)
<< ", received ID " << to_hex(received_id)
<< "). This prepared statement won't work anymore. This usually happens when you run a "
"'USE...' query after the statement was prepared.";
String message = ss.str();
request_handler_->set_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE, message);
}
void RequestExecution::on_result_response(Connection* connection, ResponseMessage* response) {
ResultResponse* result = static_cast<ResultResponse*>(response->response_body().get());
switch (result->kind()) {
case CASS_RESULT_KIND_ROWS:
current_host_->update_latency(uv_hrtime() - start_time_ns_);
// Execute statements with no metadata get their metadata from
// result_metadata() returned when the statement was prepared.
if (request()->opcode() == CQL_OPCODE_EXECUTE) {
if (result->no_metadata()) {
if (!skip_metadata()) {
// Caused by a race condition in C* 2.1.0
on_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE,
"Expected metadata but no metadata in response (see CASSANDRA-8054)");
return;
}
result->set_metadata(prepared_metadata_entry()->result()->result_metadata());
} else if (result->metadata_changed()) {
notify_result_metadata_changed(request(), result);
}
}
if (!response->response_body()->has_tracing_id() ||
!request_handler_->wait_for_tracing_data(current_host(), response->response_body())) {
set_response(response->response_body());
}
break;
case CASS_RESULT_KIND_SCHEMA_CHANGE: {
if (!request_handler_->wait_for_schema_agreement(current_host(), response->response_body())) {
set_response(response->response_body());
}
break;
}
case CASS_RESULT_KIND_SET_KEYSPACE:
// The response is set after the keyspace is propagated to all threads.
request_handler_->notify_keyspace_changed(result->quoted_keyspace(), current_host_,
response->response_body());
break;
case CASS_RESULT_KIND_PREPARED:
notify_result_metadata_changed(request(), result);
if (!request_handler_->prepare_all(current_host(), response->response_body())) {
set_response(response->response_body());
}
break;
default:
set_response(response->response_body());
break;
}
}
void RequestExecution::on_error_response(Connection* connection, ResponseMessage* response) {
ErrorResponse* error = static_cast<ErrorResponse*>(response->response_body().get());
RetryPolicy::RetryDecision decision = RetryPolicy::RetryDecision::return_error();
switch (error->code()) {
case CQL_ERROR_READ_TIMEOUT:
if (retry_policy()) {
decision = retry_policy()->on_read_timeout(request(), error->consistency(),
error->received(), error->required(),
error->data_present() > 0, num_retries_);
}
break;
case CQL_ERROR_WRITE_TIMEOUT:
if (retry_policy() && request()->is_idempotent()) {
decision =
retry_policy()->on_write_timeout(request(), error->consistency(), error->received(),
error->required(), error->write_type(), num_retries_);
}
break;
case CQL_ERROR_UNAVAILABLE:
if (retry_policy()) {
decision = retry_policy()->on_unavailable(
request(), error->consistency(), error->required(), error->received(), num_retries_);
}
break;
case CQL_ERROR_OVERLOADED:
LOG_WARN("Host %s is overloaded.", connection->address_string().c_str());
if (retry_policy() && request()->is_idempotent()) {
decision = retry_policy()->on_request_error(request(), consistency(), error, num_retries_);
}
break;
case CQL_ERROR_SERVER_ERROR:
LOG_WARN("Received server error '%s' from host %s. Defuncting the connection...",
error->message().to_string().c_str(), connection->address_string().c_str());
connection->defunct();
if (retry_policy() && request()->is_idempotent()) {
decision = retry_policy()->on_request_error(request(), consistency(), error, num_retries_);
}
break;
case CQL_ERROR_IS_BOOTSTRAPPING:
LOG_ERROR("Query sent to bootstrapping host %s. Retrying on the next host...",
connection->address_string().c_str());
retry_next_host();
return; // Done
case CQL_ERROR_UNPREPARED:
on_error_unprepared(connection, error);
return; // Done
default:
// Return the error response
break;
}
// Process retry decision
switch (decision.type()) {
case RetryPolicy::RetryDecision::RETURN_ERROR:
set_error_with_error_response(
response->response_body(),
static_cast<CassError>(CASS_ERROR(CASS_ERROR_SOURCE_SERVER, error->code())),
error->message().to_string());
break;
case RetryPolicy::RetryDecision::RETRY:
set_retry_consistency(decision.retry_consistency());
if (decision.retry_current_host()) {
retry_current_host();
} else {
retry_next_host();
}
num_retries_++;
break;
case RetryPolicy::RetryDecision::IGNORE:
set_response(Response::Ptr(new ResultResponse()));
break;
}
}
void RequestExecution::on_error_unprepared(Connection* connection, ErrorResponse* error) {
LOG_DEBUG("Unprepared error response returned for request: %s",
error->message().to_string().c_str());
String query;
String id = error->prepared_id().to_string();
if (request()->opcode() == CQL_OPCODE_EXECUTE) {
const ExecuteRequest* execute = static_cast<const ExecuteRequest*>(request());
query = execute->prepared()->query();
} else if (request()->opcode() == CQL_OPCODE_BATCH) {
const BatchRequest* batch = static_cast<const BatchRequest*>(request());
if (!batch->find_prepared_query(id, &query)) {
set_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE,
"Unable to find prepared statement in batch statement");
return;
}
} else {
connection->defunct();
set_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE, "Received unprepared error for invalid "
"request type or invalid prepared id");
return;
}
RequestCallback::Ptr callback(new PrepareCallback(query, id, this));
if (connection->write_and_flush(callback) < 0) {
// Try to prepare on the same host but on a different connection
retry_current_host();
}
}
void RequestExecution::set_response(const Response::Ptr& response) {
request_handler_->set_response(current_host_, response);
}
void RequestExecution::set_error(CassError code, const String& message) {
request_handler_->set_error(current_host_, code, message);
}
void RequestExecution::set_error_with_error_response(const Response::Ptr& error, CassError code,
const String& message) {
request_handler_->set_error_with_error_response(current_host_, error, code, message);
}