blob: 1d53d163fb5ae7a90c44159394781ee97dd9f3bb [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "response.hpp"
#include "auth_responses.hpp"
#include "error_response.hpp"
#include "event_response.hpp"
#include "logger.hpp"
#include "ready_response.hpp"
#include "result_response.hpp"
#include "supported_response.hpp"
#include <cstring>
using namespace datastax::internal::core;
/**
* A dummy invalid protocol error response that's used to handle responses
* encoded with deprecated protocol versions.
*/
class InvalidProtocolErrorResponse : public ErrorResponse {
public:
InvalidProtocolErrorResponse()
: ErrorResponse(CQL_ERROR_PROTOCOL_ERROR, "Invalid or unsupported protocol version") {}
virtual bool decode(Decoder& decoder) {
return true; // Ignore decoding the body
}
};
Response::Response(uint8_t opcode)
: opcode_(opcode) {
memset(&tracing_id_, 0, sizeof(CassUuid));
}
bool Response::has_tracing_id() const { return tracing_id_.time_and_version != 0; }
bool Response::decode_trace_id(Decoder& decoder) { return decoder.decode_uuid(&tracing_id_); }
bool Response::decode_custom_payload(Decoder& decoder) {
return decoder.decode_custom_payload(custom_payload_);
}
bool Response::decode_warnings(Decoder& decoder) { return decoder.decode_warnings(warnings_); }
bool ResponseMessage::allocate_body(int8_t opcode) {
response_body_.reset();
switch (opcode) {
case CQL_OPCODE_ERROR:
response_body_.reset(new ErrorResponse());
return true;
case CQL_OPCODE_READY:
response_body_.reset(new ReadyResponse());
return true;
case CQL_OPCODE_AUTHENTICATE:
response_body_.reset(new AuthenticateResponse());
return true;
case CQL_OPCODE_SUPPORTED:
response_body_.reset(new SupportedResponse());
return true;
case CQL_OPCODE_RESULT:
response_body_.reset(new ResultResponse());
return true;
case CQL_OPCODE_EVENT:
response_body_.reset(new EventResponse());
return true;
case CQL_OPCODE_AUTH_CHALLENGE:
response_body_.reset(new AuthChallengeResponse());
return true;
case CQL_OPCODE_AUTH_SUCCESS:
response_body_.reset(new AuthSuccessResponse());
return true;
default:
return false;
}
}
ssize_t ResponseMessage::decode(const char* input, size_t size) {
const char* input_pos = input;
received_ += size;
if (!is_header_received_) {
if (version_ == 0) {
if (received_ < 1) {
LOG_ERROR("Expected at least 1 byte to decode header version");
return -1;
}
version_ = input[0] & 0x7F; // "input" will always have at least 1 bytes
if (version_ >= CASS_PROTOCOL_VERSION_V3) {
header_size_ = CASS_HEADER_SIZE_V3;
} else {
header_size_ = CASS_HEADER_SIZE_V1_AND_V2;
}
}
if (received_ >= header_size_) {
// We may have received more data then we need, only copy what we need
size_t overage = received_ - header_size_;
size_t needed = size - overage;
memcpy(header_buffer_pos_, input_pos, needed);
header_buffer_pos_ += needed;
input_pos += needed;
assert(header_buffer_pos_ == header_buffer_ + header_size_);
const char* buffer = header_buffer_ + 1; // Skip over "version" byte
flags_ = *(buffer++);
if (version_ >= CASS_PROTOCOL_VERSION_V3) {
buffer = decode_int16(buffer, stream_);
} else {
stream_ = *(buffer++);
}
opcode_ = *(buffer++);
decode_int32(buffer, length_);
is_header_received_ = true;
// If a deprecated version of the protocol is encountered then we fake
// an invalid protocol error.
if (version_ < CASS_PROTOCOL_VERSION_V3) {
response_body_.reset(new InvalidProtocolErrorResponse());
} else if (!allocate_body(opcode_) || !response_body_) {
return -1;
}
response_body_->set_buffer(length_);
body_buffer_pos_ = response_body_->data();
} else {
// We haven't received all the data for the header. We consume the
// entire buffer.
memcpy(header_buffer_pos_, input_pos, size);
header_buffer_pos_ += size;
return size;
}
}
const size_t remaining = size - (input_pos - input);
const size_t frame_size = header_size_ + length_;
if (received_ >= frame_size) {
// We may have received more data then we need, only copy what we need
size_t overage = received_ - frame_size;
size_t needed = remaining - overage;
memcpy(body_buffer_pos_, input_pos, needed);
body_buffer_pos_ += needed;
input_pos += needed;
assert(body_buffer_pos_ == response_body_->data() + length_);
Decoder decoder(response_body_->data(), length_, ProtocolVersion(version_));
if (flags_ & CASS_FLAG_TRACING) {
if (!response_body_->decode_trace_id(decoder)) return -1;
}
if (flags_ & CASS_FLAG_WARNING) {
if (!response_body_->decode_warnings(decoder)) return -1;
}
if (flags_ & CASS_FLAG_CUSTOM_PAYLOAD) {
if (!response_body_->decode_custom_payload(decoder)) return -1;
}
if (!response_body_->decode(decoder)) {
is_body_error_ = true;
return -1;
}
is_body_ready_ = true;
} else {
// We haven't received all the data for the frame. We consume the entire
// buffer.
memcpy(body_buffer_pos_, input_pos, remaining);
body_buffer_pos_ += remaining;
return size;
}
return input_pos - input;
}