blob: 4c5f75b055b8d7c881d1f585f5c61b08fc7943f1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/inbound_call.h"
#include <memory>
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/connection.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/serialization.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/metrics.h"
#include "kudu/util/trace.h"
using google::protobuf::FieldDescriptor;
using google::protobuf::io::CodedOutputStream;
using google::protobuf::Message;
using google::protobuf::MessageLite;
using std::shared_ptr;
using std::vector;
using strings::Substitute;
DEFINE_bool(rpc_dump_all_traces, false,
"If true, dump all RPC traces at INFO level");
TAG_FLAG(rpc_dump_all_traces, advanced);
TAG_FLAG(rpc_dump_all_traces, runtime);
namespace kudu {
namespace rpc {
InboundCall::InboundCall(Connection* conn)
: conn_(conn),
sidecars_deleter_(&sidecars_),
trace_(new Trace) {
RecordCallReceived();
}
InboundCall::~InboundCall() {}
Status InboundCall::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
TRACE_EVENT_FLOW_BEGIN0("rpc", "InboundCall", this);
TRACE_EVENT0("rpc", "InboundCall::ParseFrom");
RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_, &serialized_request_));
// Adopt the service/method info from the header as soon as it's available.
if (PREDICT_FALSE(!header_.has_remote_method())) {
return Status::Corruption("Non-connection context request header must specify remote_method");
}
if (PREDICT_FALSE(!header_.remote_method().IsInitialized())) {
return Status::Corruption("remote_method in request header is not initialized",
header_.remote_method().InitializationErrorString());
}
remote_method_.FromPB(header_.remote_method());
// Retain the buffer that we have a view into.
transfer_.swap(transfer);
return Status::OK();
}
void InboundCall::RespondSuccess(const MessageLite& response) {
TRACE_EVENT0("rpc", "InboundCall::RespondSuccess");
Respond(response, true);
}
void InboundCall::RespondUnsupportedFeature(const vector<uint32_t>& unsupported_features) {
TRACE_EVENT0("rpc", "InboundCall::RespondUnsupportedFeature");
ErrorStatusPB err;
err.set_message("unsupported feature flags");
err.set_code(ErrorStatusPB::ERROR_INVALID_REQUEST);
for (uint32_t feature : unsupported_features) {
err.add_unsupported_feature_flags(feature);
}
Respond(err, false);
}
void InboundCall::RespondFailure(ErrorStatusPB::RpcErrorCodePB error_code,
const Status& status) {
TRACE_EVENT0("rpc", "InboundCall::RespondFailure");
ErrorStatusPB err;
err.set_message(status.ToString());
err.set_code(error_code);
Respond(err, false);
}
void InboundCall::RespondApplicationError(int error_ext_id, const std::string& message,
const MessageLite& app_error_pb) {
ErrorStatusPB err;
ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
Respond(err, false);
}
void InboundCall::ApplicationErrorToPB(int error_ext_id, const std::string& message,
const google::protobuf::MessageLite& app_error_pb,
ErrorStatusPB* err) {
err->set_message(message);
const FieldDescriptor* app_error_field =
err->GetReflection()->FindKnownExtensionByNumber(error_ext_id);
if (app_error_field != nullptr) {
err->GetReflection()->MutableMessage(err, app_error_field)->CheckTypeAndMergeFrom(app_error_pb);
} else {
LOG(DFATAL) << "Unable to find application error extension ID " << error_ext_id
<< " (message=" << message << ")";
}
}
void InboundCall::Respond(const MessageLite& response,
bool is_success) {
TRACE_EVENT_FLOW_END0("rpc", "InboundCall", this);
Status s = SerializeResponseBuffer(response, is_success);
if (PREDICT_FALSE(!s.ok())) {
// TODO: test error case, serialize error response instead
LOG(DFATAL) << "Unable to serialize response: " << s.ToString();
}
TRACE_EVENT_ASYNC_END1("rpc", "InboundCall", this,
"method", remote_method_.method_name());
TRACE_TO(trace_, "Queueing $0 response", is_success ? "success" : "failure");
LogTrace();
conn_->QueueResponseForCall(gscoped_ptr<InboundCall>(this));
}
Status InboundCall::SerializeResponseBuffer(const MessageLite& response,
bool is_success) {
uint32_t protobuf_msg_size = response.ByteSize();
ResponseHeader resp_hdr;
resp_hdr.set_call_id(header_.call_id());
resp_hdr.set_is_error(!is_success);
uint32_t absolute_sidecar_offset = protobuf_msg_size;
for (RpcSidecar* car : sidecars_) {
resp_hdr.add_sidecar_offsets(absolute_sidecar_offset);
absolute_sidecar_offset += car->AsSlice().size();
}
int additional_size = absolute_sidecar_offset - protobuf_msg_size;
RETURN_NOT_OK(serialization::SerializeMessage(response, &response_msg_buf_,
additional_size, true));
int main_msg_size = additional_size + response_msg_buf_.size();
RETURN_NOT_OK(serialization::SerializeHeader(resp_hdr, main_msg_size,
&response_hdr_buf_));
return Status::OK();
}
void InboundCall::SerializeResponseTo(vector<Slice>* slices) const {
TRACE_EVENT0("rpc", "InboundCall::SerializeResponseTo");
CHECK_GT(response_hdr_buf_.size(), 0);
CHECK_GT(response_msg_buf_.size(), 0);
slices->reserve(slices->size() + 2 + sidecars_.size());
slices->push_back(Slice(response_hdr_buf_));
slices->push_back(Slice(response_msg_buf_));
for (RpcSidecar* car : sidecars_) {
slices->push_back(car->AsSlice());
}
}
Status InboundCall::AddRpcSidecar(gscoped_ptr<RpcSidecar> car, int* idx) {
// Check that the number of sidecars does not exceed the number of payload
// slices that are free (two are used up by the header and main message
// protobufs).
if (sidecars_.size() + 2 > OutboundTransfer::kMaxPayloadSlices) {
return Status::ServiceUnavailable("All available sidecars already used");
}
sidecars_.push_back(car.release());
*idx = sidecars_.size() - 1;
return Status::OK();
}
string InboundCall::ToString() const {
return Substitute("Call $0 from $1 (request call id $2)",
remote_method_.ToString(),
conn_->remote().ToString(),
header_.call_id());
}
void InboundCall::DumpPB(const DumpRunningRpcsRequestPB& req,
RpcCallInProgressPB* resp) {
resp->mutable_header()->CopyFrom(header_);
if (req.include_traces() && trace_) {
resp->set_trace_buffer(trace_->DumpToString(true));
}
resp->set_micros_elapsed(MonoTime::Now(MonoTime::FINE).GetDeltaSince(timing_.time_received)
.ToMicroseconds());
}
void InboundCall::LogTrace() const {
MonoTime now = MonoTime::Now(MonoTime::FINE);
int total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
if (header_.has_timeout_millis() && header_.timeout_millis() > 0) {
double log_threshold = header_.timeout_millis() * 0.75f;
if (total_time > log_threshold) {
// TODO: consider pushing this onto another thread since it may be slow.
// The traces may also be too large to fit in a log message.
LOG(WARNING) << ToString() << " took " << total_time << "ms (client timeout "
<< header_.timeout_millis() << ").";
std::string s = trace_->DumpToString(true);
if (!s.empty()) {
LOG(WARNING) << "Trace:\n" << s;
}
return;
}
}
if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
LOG(INFO) << ToString() << " took " << total_time << "ms. Trace:";
trace_->Dump(&LOG(INFO), true);
}
}
const UserCredentials& InboundCall::user_credentials() const {
return conn_->user_credentials();
}
const Sockaddr& InboundCall::remote_address() const {
return conn_->remote();
}
const scoped_refptr<Connection>& InboundCall::connection() const {
return conn_;
}
Trace* InboundCall::trace() {
return trace_.get();
}
void InboundCall::RecordCallReceived() {
TRACE_EVENT_ASYNC_BEGIN0("rpc", "InboundCall", this);
DCHECK(!timing_.time_received.Initialized()); // Protect against multiple calls.
timing_.time_received = MonoTime::Now(MonoTime::FINE);
}
void InboundCall::RecordHandlingStarted(scoped_refptr<Histogram> incoming_queue_time) {
DCHECK(incoming_queue_time != nullptr);
DCHECK(!timing_.time_handled.Initialized()); // Protect against multiple calls.
timing_.time_handled = MonoTime::Now(MonoTime::FINE);
incoming_queue_time->Increment(
timing_.time_handled.GetDeltaSince(timing_.time_received).ToMicroseconds());
}
void InboundCall::RecordHandlingCompleted(scoped_refptr<Histogram> handler_run_time) {
DCHECK(handler_run_time != nullptr);
DCHECK(!timing_.time_completed.Initialized()); // Protect against multiple calls.
timing_.time_completed = MonoTime::Now(MonoTime::FINE);
handler_run_time->Increment(
timing_.time_completed.GetDeltaSince(timing_.time_handled).ToMicroseconds());
}
bool InboundCall::ClientTimedOut() const {
if (!header_.has_timeout_millis() || header_.timeout_millis() == 0) {
return false;
}
MonoTime now = MonoTime::Now(MonoTime::FINE);
int total_time = now.GetDeltaSince(timing_.time_received).ToMilliseconds();
return total_time > header_.timeout_millis();
}
MonoTime InboundCall::GetClientDeadline() const {
if (!header_.has_timeout_millis() || header_.timeout_millis() == 0) {
return MonoTime::Max();
}
MonoTime deadline = timing_.time_received;
deadline.AddDelta(MonoDelta::FromMilliseconds(header_.timeout_millis()));
return deadline;
}
MonoTime InboundCall::GetTimeReceived() const {
return timing_.time_received;
}
vector<uint32_t> InboundCall::GetRequiredFeatures() const {
vector<uint32_t> features;
for (uint32_t feature : header_.required_feature_flags()) {
features.push_back(feature);
}
return features;
}
} // namespace rpc
} // namespace kudu