blob: 17761f5013c14ae56cca8157a56a0541cbfcce87 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <type_traits>
#include <unordered_set>
#include <utility>
#include <vector>
#include <boost/function.hpp>
#include <gflags/gflags.h>
#include <google/protobuf/message.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/constants.h"
#include "kudu/rpc/outbound_call.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/rpc/serialization.h"
#include "kudu/rpc/transfer.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/net/sockaddr.h"
// 100M cycles should be about 50ms on a 2Ghz box. This should be high
// enough that involuntary context switches don't trigger it, but low enough
// that any serious blocking behavior on the reactor would.
DEFINE_int64(rpc_callback_max_cycles, 100 * 1000 * 1000,
"The maximum number of cycles for which an RPC callback "
"should be allowed to run without emitting a warning."
" (Advanced debugging option)");
TAG_FLAG(rpc_callback_max_cycles, advanced);
TAG_FLAG(rpc_callback_max_cycles, runtime);
// Flag used in debug build for injecting cancellation at different code paths.
DEFINE_int32(rpc_inject_cancellation_state, -1,
"If this flag is not -1, it is the state in which a cancellation request "
"will be injected. Should use values in OutboundCall::State only");
TAG_FLAG(rpc_inject_cancellation_state, unsafe);
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace rpc {
using google::protobuf::Message;
using strings::Substitute;
static const double kMicrosPerSecond = 1000000.0;
///
/// OutboundCall
///
OutboundCall::OutboundCall(const ConnectionId& conn_id,
const RemoteMethod& remote_method,
google::protobuf::Message* response_storage,
RpcController* controller,
ResponseCallback callback)
: state_(READY),
remote_method_(remote_method),
conn_id_(conn_id),
callback_(std::move(callback)),
controller_(DCHECK_NOTNULL(controller)),
response_(DCHECK_NOTNULL(response_storage)),
cancellation_requested_(false) {
DVLOG(4) << "OutboundCall " << this << " constructed with state_: " << StateName(state_)
<< " and RPC timeout: "
<< (controller->timeout().Initialized() ? controller->timeout().ToString() : "none");
header_.set_call_id(kInvalidCallId);
remote_method.ToPB(header_.mutable_remote_method());
start_time_ = MonoTime::Now();
if (!controller_->required_server_features().empty()) {
required_rpc_features_.insert(RpcFeatureFlag::APPLICATION_FEATURE_FLAGS);
}
if (controller_->request_id_) {
header_.set_allocated_request_id(controller_->request_id_.release());
}
}
OutboundCall::~OutboundCall() {
DCHECK(IsFinished());
DVLOG(4) << "OutboundCall " << this << " destroyed with state_: " << StateName(state_);
}
size_t OutboundCall::SerializeTo(TransferPayload* slices) {
DCHECK_LT(0, request_buf_.size())
<< "Must call SetRequestPayload() before SerializeTo()";
const MonoDelta &timeout = controller_->timeout();
if (timeout.Initialized()) {
header_.set_timeout_millis(timeout.ToMilliseconds());
}
for (uint32_t feature : controller_->required_server_features()) {
header_.add_required_feature_flags(feature);
}
DCHECK_LE(0, sidecar_byte_size_);
serialization::SerializeHeader(
header_, sidecar_byte_size_ + request_buf_.size(), &header_buf_);
size_t n_slices = 2 + sidecars_.size();
DCHECK_LE(n_slices, slices->size());
auto slice_iter = slices->begin();
*slice_iter++ = Slice(header_buf_);
*slice_iter++ = Slice(request_buf_);
for (auto& sidecar : sidecars_) {
*slice_iter++ = sidecar->AsSlice();
}
DCHECK_EQ(slice_iter - slices->begin(), n_slices);
return n_slices;
}
void OutboundCall::SetRequestPayload(const Message& req,
vector<unique_ptr<RpcSidecar>>&& sidecars) {
DCHECK_EQ(-1, sidecar_byte_size_);
sidecars_ = move(sidecars);
DCHECK_LE(sidecars_.size(), TransferLimits::kMaxSidecars);
// Compute total size of sidecar payload so that extra space can be reserved as part of
// the request body.
uint32_t message_size = req.ByteSize();
sidecar_byte_size_ = 0;
for (const unique_ptr<RpcSidecar>& car: sidecars_) {
header_.add_sidecar_offsets(sidecar_byte_size_ + message_size);
int32_t sidecar_bytes = car->AsSlice().size();
DCHECK_LE(sidecar_byte_size_, TransferLimits::kMaxTotalSidecarBytes - sidecar_bytes);
sidecar_byte_size_ += sidecar_bytes;
}
serialization::SerializeMessage(req, &request_buf_, sidecar_byte_size_, true);
}
Status OutboundCall::status() const {
std::lock_guard<simple_spinlock> l(lock_);
return status_;
}
const ErrorStatusPB* OutboundCall::error_pb() const {
std::lock_guard<simple_spinlock> l(lock_);
return error_pb_.get();
}
string OutboundCall::StateName(State state) {
switch (state) {
case READY:
return "READY";
case ON_OUTBOUND_QUEUE:
return "ON_OUTBOUND_QUEUE";
case SENDING:
return "SENDING";
case SENT:
return "SENT";
case NEGOTIATION_TIMED_OUT:
return "NEGOTIATION_TIMED_OUT";
case TIMED_OUT:
return "TIMED_OUT";
case CANCELLED:
return "CANCELLED";
case FINISHED_NEGOTIATION_ERROR:
return "FINISHED_NEGOTIATION_ERROR";
case FINISHED_ERROR:
return "FINISHED_ERROR";
case FINISHED_SUCCESS:
return "FINISHED_SUCCESS";
default:
LOG(DFATAL) << "Unknown state in OutboundCall: " << state;
return StringPrintf("UNKNOWN(%d)", state);
}
}
void OutboundCall::set_state(State new_state) {
std::lock_guard<simple_spinlock> l(lock_);
set_state_unlocked(new_state);
}
OutboundCall::State OutboundCall::state() const {
std::lock_guard<simple_spinlock> l(lock_);
return state_;
}
void OutboundCall::set_state_unlocked(State new_state) {
// Sanity check state transitions.
DVLOG(3) << "OutboundCall " << this << " (" << ToString() << ") switching from " <<
StateName(state_) << " to " << StateName(new_state);
switch (new_state) {
case ON_OUTBOUND_QUEUE:
DCHECK_EQ(state_, READY);
break;
case SENDING:
// Allow SENDING to be set idempotently so we don't have to specifically check
// whether the state is transitioning in the RPC code.
DCHECK(state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
break;
case SENT:
DCHECK_EQ(state_, SENDING);
break;
case NEGOTIATION_TIMED_OUT:
DCHECK(state_ == ON_OUTBOUND_QUEUE);
break;
case TIMED_OUT:
DCHECK(state_ == SENT || state_ == ON_OUTBOUND_QUEUE || state_ == SENDING);
break;
case CANCELLED:
DCHECK(state_ == READY || state_ == ON_OUTBOUND_QUEUE || state_ == SENT);
break;
case FINISHED_SUCCESS:
DCHECK_EQ(state_, SENT);
break;
default:
// No sanity checks for others.
break;
}
state_ = new_state;
}
void OutboundCall::Cancel() {
cancellation_requested_ = true;
// No lock needed as it's called from reactor thread
switch (state_) {
case READY:
case ON_OUTBOUND_QUEUE:
case SENT: {
SetCancelled();
break;
}
case SENDING:
case NEGOTIATION_TIMED_OUT:
case TIMED_OUT:
case CANCELLED:
case FINISHED_NEGOTIATION_ERROR:
case FINISHED_ERROR:
case FINISHED_SUCCESS:
break;
}
}
void OutboundCall::CallCallback() {
// Clear references to outbound sidecars before invoking callback.
sidecars_.clear();
int64_t start_cycles = CycleClock::Now();
{
SCOPED_WATCH_STACK(100);
callback_();
// Clear the callback, since it may be holding onto reference counts
// via bound parameters. We do this inside the timer because it's possible
// the user has naughty destructors that block, and we want to account for that
// time here if they happen to run on this thread.
callback_ = NULL;
}
int64_t end_cycles = CycleClock::Now();
int64_t wait_cycles = end_cycles - start_cycles;
if (PREDICT_FALSE(wait_cycles > FLAGS_rpc_callback_max_cycles)) {
double micros = static_cast<double>(wait_cycles) / base::CyclesPerSecond()
* kMicrosPerSecond;
LOG(WARNING) << "RPC callback for " << ToString() << " blocked reactor thread for "
<< micros << "us";
}
}
void OutboundCall::SetResponse(gscoped_ptr<CallResponse> resp) {
call_response_ = std::move(resp);
Slice r(call_response_->serialized_response());
if (call_response_->is_success()) {
// TODO: here we're deserializing the call response within the reactor thread,
// which isn't great, since it would block processing of other RPCs in parallel.
// Should look into a way to avoid this.
if (!response_->ParseFromArray(r.data(), r.size())) {
SetFailed(Status::IOError("invalid RPC response, missing fields",
response_->InitializationErrorString()));
return;
}
set_state(FINISHED_SUCCESS);
CallCallback();
} else {
// Error
unique_ptr<ErrorStatusPB> err(new ErrorStatusPB());
if (!err->ParseFromArray(r.data(), r.size())) {
SetFailed(Status::IOError("Was an RPC error but could not parse error response",
err->InitializationErrorString()));
return;
}
Status s = Status::RemoteError(err->message());
SetFailed(std::move(s), Phase::REMOTE_CALL, std::move(err));
}
}
void OutboundCall::SetQueued() {
set_state(ON_OUTBOUND_QUEUE);
}
void OutboundCall::SetSending() {
set_state(SENDING);
}
void OutboundCall::SetSent() {
set_state(SENT);
// This method is called in the reactor thread, so free the header buf,
// which was also allocated from this thread. tcmalloc's thread caching
// behavior is a lot more efficient if memory is freed from the same thread
// which allocated it -- this lets it keep to thread-local operations instead
// of taking a mutex to put memory back on the global freelist.
delete [] header_buf_.release();
// request_buf_ is also done being used here, but since it was allocated by
// the caller thread, we would rather let that thread free it whenever it
// deletes the RpcController.
// If cancellation was requested, it's now a good time to do the actual cancellation.
if (cancellation_requested()) {
SetCancelled();
}
}
void OutboundCall::SetFailed(Status status,
Phase phase,
unique_ptr<ErrorStatusPB> err_pb) {
DCHECK(!status.ok());
DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
{
std::lock_guard<simple_spinlock> l(lock_);
status_ = std::move(status);
error_pb_ = std::move(err_pb);
set_state_unlocked(phase == Phase::CONNECTION_NEGOTIATION
? FINISHED_NEGOTIATION_ERROR
: FINISHED_ERROR);
}
CallCallback();
}
void OutboundCall::SetTimedOut(Phase phase) {
static const char* kErrMsgNegotiation =
"connection negotiation to $1 for RPC $0 timed out after $2 ($3)";
static const char* kErrMsgCall = "$0 RPC to $1 timed out after $2 ($3)";
DCHECK(phase == Phase::CONNECTION_NEGOTIATION || phase == Phase::REMOTE_CALL);
// We have to fetch timeout outside the lock to avoid a lock
// order inversion between this class and RpcController.
const MonoDelta timeout = controller_->timeout();
{
std::lock_guard<simple_spinlock> l(lock_);
status_ = Status::TimedOut(
Substitute((phase == Phase::REMOTE_CALL) ? kErrMsgCall : kErrMsgNegotiation,
remote_method_.method_name(),
conn_id_.remote().ToString(),
timeout.ToString(),
StateName(state_)));
set_state_unlocked((phase == Phase::REMOTE_CALL) ? TIMED_OUT : NEGOTIATION_TIMED_OUT);
}
CallCallback();
}
void OutboundCall::SetCancelled() {
DCHECK(!IsFinished());
{
std::lock_guard<simple_spinlock> l(lock_);
status_ = Status::Aborted(
Substitute("$0 RPC to $1 is cancelled in state $2",
remote_method_.method_name(),
conn_id_.remote().ToString(),
StateName(state_)));
set_state_unlocked(CANCELLED);
}
CallCallback();
}
bool OutboundCall::IsTimedOut() const {
std::lock_guard<simple_spinlock> l(lock_);
switch (state_) {
case NEGOTIATION_TIMED_OUT: // fall-through
case TIMED_OUT:
return true;
default:
return false;
}
}
bool OutboundCall::IsCancelled() const {
std::lock_guard<simple_spinlock> l(lock_);
return state_ == CANCELLED;
}
bool OutboundCall::IsNegotiationError() const {
std::lock_guard<simple_spinlock> l(lock_);
switch (state_) {
case FINISHED_NEGOTIATION_ERROR: // fall-through
case NEGOTIATION_TIMED_OUT:
return true;
default:
return false;
}
}
bool OutboundCall::IsFinished() const {
std::lock_guard<simple_spinlock> l(lock_);
switch (state_) {
case READY:
case SENDING:
case ON_OUTBOUND_QUEUE:
case SENT:
return false;
case NEGOTIATION_TIMED_OUT:
case TIMED_OUT:
case CANCELLED:
case FINISHED_NEGOTIATION_ERROR:
case FINISHED_ERROR:
case FINISHED_SUCCESS:
return true;
default:
LOG(FATAL) << "Unknown call state: " << state_;
return false;
}
}
string OutboundCall::ToString() const {
return Substitute("RPC call $0 -> $1", remote_method_.ToString(), conn_id_.ToString());
}
void OutboundCall::DumpPB(const DumpConnectionsRequestPB& req,
RpcCallInProgressPB* resp) {
std::lock_guard<simple_spinlock> l(lock_);
resp->mutable_header()->CopyFrom(header_);
resp->set_micros_elapsed((MonoTime::Now() - start_time_).ToMicroseconds());
switch (state_) {
case READY:
// Don't bother setting a state for "READY" since we don't expose a call
// until it's at least on the queue of a connection.
break;
case ON_OUTBOUND_QUEUE:
resp->set_state(RpcCallInProgressPB::ON_OUTBOUND_QUEUE);
break;
case SENDING:
resp->set_state(RpcCallInProgressPB::SENDING);
break;
case SENT:
resp->set_state(RpcCallInProgressPB::SENT);
break;
case NEGOTIATION_TIMED_OUT:
resp->set_state(RpcCallInProgressPB::NEGOTIATION_TIMED_OUT);
break;
case TIMED_OUT:
resp->set_state(RpcCallInProgressPB::TIMED_OUT);
break;
case CANCELLED:
resp->set_state(RpcCallInProgressPB::CANCELLED);
break;
case FINISHED_NEGOTIATION_ERROR:
resp->set_state(RpcCallInProgressPB::FINISHED_NEGOTIATION_ERROR);
break;
case FINISHED_ERROR:
resp->set_state(RpcCallInProgressPB::FINISHED_ERROR);
break;
case FINISHED_SUCCESS:
resp->set_state(RpcCallInProgressPB::FINISHED_SUCCESS);
break;
}
}
///
/// CallResponse
///
CallResponse::CallResponse()
: parsed_(false) {
}
Status CallResponse::GetSidecar(int idx, Slice* sidecar) const {
DCHECK(parsed_);
if (idx < 0 || idx >= header_.sidecar_offsets_size()) {
return Status::InvalidArgument(strings::Substitute(
"Index $0 does not reference a valid sidecar", idx));
}
*sidecar = sidecar_slices_[idx];
return Status::OK();
}
Status CallResponse::ParseFrom(gscoped_ptr<InboundTransfer> transfer) {
CHECK(!parsed_);
RETURN_NOT_OK(serialization::ParseMessage(transfer->data(), &header_,
&serialized_response_));
// Use information from header to extract the payload slices.
RETURN_NOT_OK(RpcSidecar::ParseSidecars(header_.sidecar_offsets(),
serialized_response_, sidecar_slices_));
if (header_.sidecar_offsets_size() > 0) {
serialized_response_ =
Slice(serialized_response_.data(), header_.sidecar_offsets(0));
}
transfer_.swap(transfer);
parsed_ = true;
return Status::OK();
}
} // namespace rpc
} // namespace kudu