blob: 655c453fe39e128817139fc98852f963da1846c3 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/inbound_call.h"
#include <cstdint>
#include <memory>
#include <ostream>
#include <glog/logging.h>
#include <google/protobuf/message.h>
#include <google/protobuf/message_lite.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/connection.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/rpcz_store.h"
#include "kudu/rpc/serialization.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/metrics.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/trace.h"
namespace google {
namespace protobuf {
class FieldDescriptor;
}
}
using google::protobuf::FieldDescriptor;
using google::protobuf::Message;
using google::protobuf::MessageLite;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace rpc {
InboundCall::InboundCall(Connection* conn)
: conn_(conn),
trace_(new Trace),
method_info_(nullptr),
deadline_(MonoTime::Max()) {
RecordCallReceived();
}
InboundCall::~InboundCall() {}
Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
TRACE_EVENT_FLOW_BEGIN0("rpc", "InboundCall", this);
TRACE_EVENT0("rpc", "InboundCall::ParseFrom");
RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, &serialized_request_));
// Adopt the service/method info from the header as soon as it's available.
if (PREDICT_FALSE(!header_.has_remote_method())) {
return Status::Corruption("Non-connection context request header must specify remote_method");
}
if (PREDICT_FALSE(!header_.remote_method().IsInitialized())) {
return Status::Corruption("remote_method in request header is not initialized",
header_.remote_method().InitializationErrorString());
}
remote_method_.FromPB(header_.remote_method());
// Compute and cache the call deadline.
if (header_.has_timeout_millis() && header_.timeout_millis() != 0) {
deadline_ = timing_.time_received + MonoDelta::FromMilliseconds(header_.timeout_millis());
}
if (header_.sidecar_offsets_size() > TransferLimits::kMaxSidecars) {
return Status::Corruption(strings::Substitute(
"Received $0 additional payload slices, expected at most %d",
header_.sidecar_offsets_size(), TransferLimits::kMaxSidecars));
}
RETURN_NOT_OK(RpcSidecar::ParseSidecars(
header_.sidecar_offsets(), serialized_request_, inbound_sidecar_slices_));
if (header_.sidecar_offsets_size() > 0) {
// Trim the request to just the message
serialized_request_ = Slice(serialized_request_.data(), header_.sidecar_offsets(0));
}
// Retain the buffer that we have a view into.
transfer_.swap(transfer);
return Status::OK();
}
void InboundCall::RespondSuccess(const MessageLite& response) {
TRACE_EVENT0("rpc", "InboundCall::RespondSuccess");
Respond(response, true);
}
void InboundCall::RespondUnsupportedFeature(const vector<uint32_t>& unsupported_features) {
TRACE_EVENT0("rpc", "InboundCall::RespondUnsupportedFeature");
ErrorStatusPB err;
err.set_message("unsupported feature flags");
err.set_code(ErrorStatusPB::ERROR_INVALID_REQUEST);
for (uint32_t feature : unsupported_features) {
err.add_unsupported_feature_flags(feature);
}
Respond(err, false);
}
void InboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
const Status& status) {
TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
ErrorStatusPB err;
err.set_message(status.ToString());
err.set_code(error_code);
Respond(err, false);
}
void InboundCall::RespondApplicationError(int error_ext_id, const std::string& message,
const MessageLite& app_error_pb) {
ErrorStatusPB err;
ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
Respond(err, false);
}
void InboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message,
const google::protobuf::MessageLite& app_error_pb,
ErrorStatusPB* err) {
err->set_message(message);
const FieldDescriptor* app_error_field =
err->GetReflection()->FindKnownExtensionByNumber(error_ext_id);
if (app_error_field != nullptr) {
err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb);
} else {
LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id
<< " (message=" << message << ")";
}
}
void InboundCall::Respond(const MessageLite& response,
bool is_success) {
TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this);
SerializeResponseBuffer(response, is_success);
TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this,
"method", remote_method_.method_name());
TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure");
RecordHandlingCompleted();
conn_->rpcz_store()->AddCall(this);
conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this));
}
void InboundCall::SerializeResponseBuffer(const MessageLite& response,
bool is_success) {
if (PREDICT_FALSE(!response.IsInitialized())) {
LOG(ERROR) << "Invalid RPC response for " << ToString()
<< ": protobuf missing required fields: "
<< response.InitializationErrorString();
// Send it along anyway -- the client will also notice the missing fields
// and produce an error on the other side, but this will at least
// make it clear on both sides of the RPC connection what kind of error
// happened.
}
uint32_t protobuf_msg_size = response.ByteSize();
ResponseHeader resp_hdr;
resp_hdr.set_call_id(header_.call_id());
resp_hdr.set_is_error(!is_success);
int32_t sidecar_byte_size = 0;
for (const unique_ptr<RpcSidecar>& car : outbound_sidecars_) {
resp_hdr.add_sidecar_offsets(sidecar_byte_size + protobuf_msg_size);
int32_t sidecar_bytes = car->AsSlice().size();
DCHECK_LE(sidecar_byte_size, TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes);
sidecar_byte_size += sidecar_bytes;
}
serialization::SerializeMessage(response, &response_msg_buf_,
sidecar_byte_size, true);
int64_t main_msg_size = sidecar_byte_size + response_msg_buf_.size();
serialization::SerializeHeader(resp_hdr, main_msg_size,
&response_hdr_buf_);
}
size_t InboundCall::SerializeResponseTo(TransferPayload* slices) const {
TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
DCHECK_GT(response_hdr_buf_.size(), 0);
DCHECK_GT(response_msg_buf_.size(), 0);
size_t n_slices = 2 + outbound_sidecars_.size();
DCHECK_LE(n_slices, slices->size());
auto slice_iter = slices->begin();
*slice_iter++ = Slice(response_hdr_buf_);
*slice_iter++ = Slice(response_msg_buf_);
for (auto& sidecar : outbound_sidecars_) {
*slice_iter++ = sidecar->AsSlice();
}
DCHECK_EQ(slice_iter - slices->begin(), n_slices);
return n_slices;
}
Status InboundCall::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
// Check that the number of sidecars does not exceed the number of payload
// slices that are free (two are used up by the header and main message
// protobufs).
if (outbound_sidecars_.size() > TransferLimits::kMaxSidecars) {
return Status::ServiceUnavailable("All available sidecars already used");
}
int64_t sidecar_bytes = car->AsSlice().size();
if (outbound_sidecars_total_bytes_ >
TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes) {
return Status::RuntimeError(Substitute("Total size of sidecars $0 would exceed limit $1",
static_cast<int64_t>(outbound_sidecars_total_bytes_) + sidecar_bytes,
TransferLimits::kMaxTotalSidecarBytes));
}
outbound_sidecars_.emplace_back(std::move(car));
outbound_sidecars_total_bytes_ += sidecar_bytes;
DCHECK_GE(outbound_sidecars_total_bytes_, 0);
*idx = outbound_sidecars_.size() - 1;
return Status::OK();
}
string InboundCall::ToString() const {
if (header_.has_request_id()) {
return Substitute("Call $0 from $1 (ReqId={client: $2, seq_no=$3, attempt_no=$4})",
remote_method_.ToString(),
conn_->remote().ToString(),
header_.request_id().client_id(),
header_.request_id().seq_no(),
header_.request_id().attempt_no());
}
return Substitute("Call $0 from $1 (request call id $2)",
remote_method_.ToString(),
conn_->remote().ToString(),
header_.call_id());
}
void InboundCall::DumpPB(const DumpConnectionsRequestPB& req,
RpcCallInProgressPB* resp) {
resp->mutable_header()->CopyFrom(header_);
if (req.include_traces() && trace_) {
resp->set_trace_buffer(trace_->DumpToString());
}
resp->set_micros_elapsed((MonoTime::Now() - timing_.time_received)
.ToMicroseconds());
}
const RemoteUser& InboundCall::remote_user() const {
return conn_->remote_user();
}
const Sockaddr& InboundCall::remote_address() const {
return conn_->remote();
}
const scoped_refptr<Connection>& InboundCall::connection() const {
return conn_;
}
Trace* InboundCall::trace() {
return trace_.get();
}
void InboundCall::RecordCallReceived() {
TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
DCHECK(!timing_.time_received.Initialized()); // Protect against multiple calls.
timing_.time_received = MonoTime::Now();
}
void InboundCall::RecordHandlingStarted(Histogram* incoming_queue_time) {
DCHECK(incoming_queue_time != nullptr);
DCHECK(!timing_.time_handled.Initialized()); // Protect against multiple calls.
timing_.time_handled = MonoTime::Now();
incoming_queue_time->Increment(
(timing_.time_handled - timing_.time_received).ToMicroseconds());
}
void InboundCall::RecordHandlingCompleted() {
DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls.
timing_.time_completed = MonoTime::Now();
if (!timing_.time_handled.Initialized()) {
// Sometimes we respond to a call before we begin handling it (e.g. due to queue
// overflow, etc). These cases should not be counted against the histogram.
return;
}
if (method_info_) {
method_info_->handler_latency_histogram->Increment(
(timing_.time_completed - timing_.time_handled).ToMicroseconds());
}
}
bool InboundCall::ClientTimedOut() const {
return MonoTime::Now() >= deadline_;
}
MonoTime InboundCall::GetTimeReceived() const {
return timing_.time_received;
}
vector<uint32_t> InboundCall::GetRequiredFeatures() const {
vector<uint32_t> features;
for (uint32_t feature : header_.required_feature_flags()) {
features.push_back(feature);
}
return features;
}
Status InboundCall::GetInboundSidecar(int idx, Slice* sidecar) const {
DCHECK(transfer_) << "Sidecars have been discarded";
if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
return Status::InvalidArgument(strings::Substitute(
"Index $0 does not reference a valid sidecar", idx));
}
*sidecar = inbound_sidecar_slices_[idx];
return Status::OK();
}
void InboundCall::DiscardTransfer() {
transfer_.reset();
}
size_t InboundCall::GetTransferSize() {
if (!transfer_) return 0;
return transfer_->data().size();
}
} // namespace rpc
} // namespace kudu