blob: b2903220c3acd2dbe57520b5c776240b49dc7596 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "request_callback.hpp"
#include "connection.hpp"
#include "constants.hpp"
#include "execute_request.hpp"
#include "execution_profile.hpp"
#include "logger.hpp"
#include "metrics.hpp"
#include "query_request.hpp"
#include "request.hpp"
#include "result_response.hpp"
#include "serialization.hpp"
using namespace datastax;
using namespace datastax::internal::core;
void RequestWrapper::set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry) {
prepared_metadata_entry_ = entry;
}
void RequestWrapper::init(const ExecutionProfile& profile,
TimestampGenerator* timestamp_generator) {
consistency_ = profile.consistency();
serial_consistency_ = profile.serial_consistency();
request_timeout_ms_ = profile.request_timeout_ms();
timestamp_ = timestamp_generator->next();
retry_policy_ = profile.retry_policy();
}
void RequestCallback::notify_write(Connection* connection, int stream) {
protocol_version_ = connection->protocol_version();
stream_ = stream;
on_write(connection);
}
bool RequestCallback::skip_metadata() const {
// Skip the metadata if this an execute request and we have an entry cached
return request()->opcode() == CQL_OPCODE_EXECUTE && prepared_metadata_entry() &&
prepared_metadata_entry()->result()->result_metadata();
}
int32_t RequestCallback::encode(BufferVec* bufs) {
const ProtocolVersion version = protocol_version_;
if (version < ProtocolVersion::lowest_supported()) {
on_error(CASS_ERROR_LIB_MESSAGE_ENCODE, "Operation unsupported by this protocol version");
return Request::REQUEST_ERROR_UNSUPPORTED_PROTOCOL;
}
size_t index = bufs->size();
bufs->push_back(Buffer()); // Placeholder
const Request* req = request();
int flags = req->flags();
int32_t length = 0;
if (version.is_beta()) {
flags |= CASS_FLAG_BETA;
}
if (version >= CASS_PROTOCOL_VERSION_V4 && req->has_custom_payload()) {
flags |= CASS_FLAG_CUSTOM_PAYLOAD;
length += req->encode_custom_payload(bufs);
}
int32_t result = req->encode(version, this, bufs);
if (result < 0) return result;
length += result;
const size_t header_size = CASS_HEADER_SIZE_V3;
Buffer buf(header_size);
size_t pos = 0;
pos = buf.encode_byte(pos, static_cast<uint8_t>(version.value()));
pos = buf.encode_byte(pos, static_cast<uint8_t>(flags));
pos = buf.encode_int16(pos, static_cast<int16_t>(stream_));
pos = buf.encode_byte(pos, req->opcode());
buf.encode_int32(pos, length);
(*bufs)[index] = buf;
return length + header_size;
}
void RequestCallback::on_close() {
switch (state()) {
case RequestCallback::REQUEST_STATE_NEW:
case RequestCallback::REQUEST_STATE_FINISHED:
assert(false && "Request state is invalid in cleanup");
break;
case RequestCallback::REQUEST_STATE_READ_BEFORE_WRITE:
set_state(RequestCallback::REQUEST_STATE_FINISHED);
// Use the response saved in the read callback
on_set(read_before_write_response());
break;
case RequestCallback::REQUEST_STATE_WRITING:
case RequestCallback::REQUEST_STATE_READING:
set_state(RequestCallback::REQUEST_STATE_FINISHED);
if (request()->is_idempotent()) {
on_retry_next_host();
} else {
on_error(CASS_ERROR_LIB_REQUEST_TIMED_OUT, "Request timed out");
}
break;
}
}
void RequestCallback::set_state(RequestCallback::State next_state) {
switch (state_) {
case REQUEST_STATE_NEW:
if (next_state == REQUEST_STATE_NEW || next_state == REQUEST_STATE_WRITING) {
state_ = next_state;
} else {
assert(false && "Invalid request state after new");
}
break;
case REQUEST_STATE_WRITING:
if (next_state == REQUEST_STATE_READING || next_state == REQUEST_STATE_READ_BEFORE_WRITE ||
next_state == REQUEST_STATE_FINISHED) {
state_ = next_state;
} else {
assert(false && "Invalid request state after writing");
}
break;
case REQUEST_STATE_READING:
if (next_state == REQUEST_STATE_FINISHED) {
state_ = next_state;
} else {
assert(false && "Invalid request state after reading");
}
break;
case REQUEST_STATE_READ_BEFORE_WRITE:
if (next_state == REQUEST_STATE_FINISHED) {
state_ = next_state;
} else {
assert(false && "Invalid request state after read before write");
}
break;
case REQUEST_STATE_FINISHED:
if (next_state == REQUEST_STATE_NEW) {
state_ = next_state;
} else {
assert(false && "Invalid request state after finished");
}
break;
default:
assert(false && "Invalid request state");
break;
}
}
const char* RequestCallback::state_string() const {
switch (state_) {
case REQUEST_STATE_NEW:
return "NEW";
case REQUEST_STATE_WRITING:
return "WRITING";
case REQUEST_STATE_READING:
return "READING";
case REQUEST_STATE_READ_BEFORE_WRITE:
return "READ_BEFORE_WRITE";
case REQUEST_STATE_FINISHED:
return "FINISHED";
}
return "INVALID";
}
SimpleRequestCallback::SimpleRequestCallback(const String& query, uint64_t request_timeout_ms)
: RequestCallback(
RequestWrapper(Request::ConstPtr(new QueryRequest(query)), request_timeout_ms)) {}
void SimpleRequestCallback::on_write(Connection* connection) {
uint64_t request_timeout_ms = this->request_timeout_ms();
if (request_timeout_ms > 0) { // 0 means no timeout
timer_.start(connection->loop(), request_timeout_ms,
bind_callback(&SimpleRequestCallback::on_timeout, this));
}
on_internal_write(connection);
}
void SimpleRequestCallback::on_set(ResponseMessage* response) {
timer_.stop();
on_internal_set(response);
}
void SimpleRequestCallback::on_error(CassError code, const String& message) {
timer_.stop();
on_internal_error(code, message);
}
void SimpleRequestCallback::on_retry_current_host() {
timer_.stop();
on_internal_timeout(); // Retries are unhandled so timeout
}
void SimpleRequestCallback::on_retry_next_host() {
on_retry_current_host(); // Same as retry current (timeout)
}
void SimpleRequestCallback::on_timeout(Timer* timer) {
on_internal_timeout();
LOG_DEBUG("Request timed out (internal)");
}
ChainedRequestCallback::ChainedRequestCallback(const String& key, const String& query,
const Ptr& chain)
: SimpleRequestCallback(query)
, chain_(chain)
, has_pending_(false)
, has_error_or_timeout_(false)
, key_(key) {}
ChainedRequestCallback::ChainedRequestCallback(const String& key, const Request::ConstPtr& request,
const Ptr& chain)
: SimpleRequestCallback(request)
, chain_(chain)
, has_pending_(false)
, has_error_or_timeout_(false)
, key_(key) {}
ChainedRequestCallback::Ptr ChainedRequestCallback::chain(const String& key, const String& query) {
has_pending_ = true;
return ChainedRequestCallback::Ptr(new ChainedRequestCallback(key, query, Ptr(this)));
}
ChainedRequestCallback::Ptr ChainedRequestCallback::chain(const String& key,
const Request::ConstPtr& request) {
has_pending_ = true;
return ChainedRequestCallback::Ptr(new ChainedRequestCallback(key, request, Ptr(this)));
}
ResultResponse::Ptr ChainedRequestCallback::result(const String& key) const {
Map::const_iterator it = responses_.find(key);
if (it == responses_.end() || it->second->opcode() != CQL_OPCODE_RESULT) {
return ResultResponse::Ptr();
}
return it->second;
}
void ChainedRequestCallback::on_internal_write(Connection* connection) {
if (chain_) {
if (connection->write_and_flush(chain_) < 0) {
on_error(CASS_ERROR_LIB_NO_STREAMS,
"No streams available when attempting to write chained request");
}
}
on_chain_write(connection);
}
void ChainedRequestCallback::on_internal_set(ResponseMessage* response) {
response_ = response->response_body();
maybe_finish();
}
void ChainedRequestCallback::on_internal_error(CassError code, const String& message) {
if (!has_error_or_timeout_) {
has_error_or_timeout_ = true;
if (chain_) {
chain_->on_error(code, message);
} else {
on_chain_error(code, message);
}
}
}
void ChainedRequestCallback::on_internal_timeout() {
if (!has_error_or_timeout_) {
has_error_or_timeout_ = true;
if (chain_) {
chain_->on_internal_timeout();
} else {
on_chain_timeout();
}
}
}
void ChainedRequestCallback::set_chain_responses(Map& responses) {
responses_.swap(responses);
maybe_finish();
}
bool ChainedRequestCallback::is_finished() const {
return response_ && !has_error_or_timeout_ &&
((has_pending_ && !responses_.empty()) || !has_pending_);
}
void ChainedRequestCallback::maybe_finish() {
if (is_finished()) {
if (response_->opcode() == CQL_OPCODE_ERROR) {
if (request()->opcode() == CQL_OPCODE_QUERY) {
LOG_ERROR("Chained error response %s for query \"%s\"",
static_cast<const ErrorResponse*>(response_.get())->error_message().c_str(),
static_cast<const QueryRequest*>(request())->query().c_str());
} else {
LOG_ERROR("Chained error response %s",
static_cast<const ErrorResponse*>(response_.get())->error_message().c_str());
}
}
responses_[key_] = response_;
if (chain_) {
chain_->set_chain_responses(responses_);
} else {
on_chain_set();
}
}
}