blob: fd4c64420d6063ba879685d8ed88497e35bb9852 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/rpcz_store.h"
#include <algorithm> // IWYU pragma: keep
#include <array>
#include <cstdint>
#include <mutex> // for unique_lock
#include <ostream>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/message.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/walltime.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/service_if.h"
#include "kudu/util/atomic.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/trace.h"
#include "kudu/util/trace_metrics.h"
DEFINE_bool(rpc_dump_all_traces, false,
"If true, dump all RPC traces at INFO level");
TAG_FLAG(rpc_dump_all_traces, advanced);
TAG_FLAG(rpc_dump_all_traces, runtime);
DEFINE_int32(rpc_duration_too_long_ms, 1000,
"Threshold (in milliseconds) above which a RPC is considered too long and its "
"duration and method name are logged at INFO level. The time measured is between "
"when a RPC is accepted and when its call handler completes.");
TAG_FLAG(rpc_duration_too_long_ms, advanced);
TAG_FLAG(rpc_duration_too_long_ms, runtime);
using std::pair;
using std::string;
using std::vector;
using std::unique_ptr;
namespace kudu {
namespace rpc {
// Sample an RPC call once every N milliseconds within each
// bucket. If the current sample in a latency bucket is older
// than this threshold, a new sample will be taken.
static const int kSampleIntervalMs = 1000;
static const int kBucketThresholdsMs[] = {10, 100, 1000};
static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1;
// An instance of this class is created For each RPC method implemented
// on the server. It keeps several recent samples for each RPC, currently
// based on fixed time buckets.
class MethodSampler {
public:
MethodSampler() {}
~MethodSampler() {}
// Potentially sample a single call.
void SampleCall(InboundCall* call);
// Dump the current samples.
void GetSamplePBs(RpczMethodPB* pb);
private:
// Convert the trace metrics from 't' into protobuf entries in 'sample_pb'.
// This function recurses through the parent-child relationship graph,
// keeping the current tree path in 'child_path' (empty at the root).
static void GetTraceMetrics(const Trace& t,
const string& child_path,
RpczSamplePB* sample_pb);
// An individual recorded sample.
struct Sample {
RequestHeader header;
scoped_refptr<Trace> trace;
int duration_ms;
};
// A sample, including the particular time at which it was
// sampled, and a lock protecting it.
struct SampleBucket {
SampleBucket() : last_sample_time(0) {}
AtomicInt<int64_t> last_sample_time;
simple_spinlock sample_lock;
Sample sample;
};
std::array<SampleBucket, kNumBuckets> buckets_;
DISALLOW_COPY_AND_ASSIGN(MethodSampler);
};
MethodSampler* RpczStore::SamplerForCall(InboundCall* call) {
if (PREDICT_FALSE(!call->method_info())) {
return nullptr;
}
// Most likely, we already have a sampler created for the call.
{
shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
auto it = method_samplers_.find(call->method_info());
if (PREDICT_TRUE(it != method_samplers_.end())) {
return it->second.get();
}
}
// If missing, create a new sampler for this method and try to insert it.
unique_ptr<MethodSampler> ms(new MethodSampler());
std::lock_guard<percpu_rwlock> lock(samplers_lock_);
auto it = method_samplers_.find(call->method_info());
if (it != method_samplers_.end()) {
return it->second.get();
}
auto* ret = ms.get();
method_samplers_[call->method_info()] = std::move(ms);
return ret;
}
void MethodSampler::SampleCall(InboundCall* call) {
// First determine which sample bucket to put this in.
int duration_ms = call->timing().TotalDuration().ToMilliseconds();
SampleBucket* bucket = &buckets_[kNumBuckets - 1];
for (int i = 0 ; i < kNumBuckets - 1; i++) {
if (duration_ms < kBucketThresholdsMs[i]) {
bucket = &buckets_[i];
break;
}
}
MicrosecondsInt64 now = GetMonoTimeMicros();
int64_t us_since_trace = now - bucket->last_sample_time.Load();
if (us_since_trace > kSampleIntervalMs * 1000) {
Sample new_sample = {call->header(), call->trace(), duration_ms};
{
std::unique_lock<simple_spinlock> lock(bucket->sample_lock, std::try_to_lock);
// If another thread is already taking a sample, it's not worth waiting.
if (!lock.owns_lock()) {
return;
}
std::swap(bucket->sample, new_sample);
bucket->last_sample_time.Store(now);
}
VLOG(2) << "Sampled call " << call->ToString();
}
}
void MethodSampler::GetTraceMetrics(const Trace& t,
const string& child_path,
RpczSamplePB* sample_pb) {
auto m = t.metrics().Get();
for (const auto& e : m) {
auto* pb = sample_pb->add_metrics();
pb->set_key(e.first);
pb->set_value(e.second);
if (!child_path.empty()) {
pb->set_child_path(child_path);
}
}
for (const auto& child_pair : t.ChildTraces()) {
string path = child_path;
if (!path.empty()) {
path += ".";
}
path += child_pair.first.ToString();
GetTraceMetrics(*child_pair.second.get(), path, sample_pb);
}
}
void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) {
for (auto& bucket : buckets_) {
if (bucket.last_sample_time.Load() == 0) continue;
std::unique_lock<simple_spinlock> lock(bucket.sample_lock);
auto* sample_pb = method_pb->add_samples();
sample_pb->mutable_header()->CopyFrom(bucket.sample.header);
sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_TIME_DELTAS));
GetTraceMetrics(*bucket.sample.trace.get(), "", sample_pb);
sample_pb->set_duration_ms(bucket.sample.duration_ms);
}
}
RpczStore::RpczStore() {}
RpczStore::~RpczStore() {}
void RpczStore::AddCall(InboundCall* call) {
LogTrace(call);
auto* sampler = SamplerForCall(call);
if (PREDICT_FALSE(!sampler)) return;
sampler->SampleCall(call);
}
void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req,
DumpRpczStoreResponsePB* resp) {
vector<pair<RpcMethodInfo*, MethodSampler*>> samplers;
{
shared_lock<rw_spinlock> l(samplers_lock_.get_lock());
for (const auto& p : method_samplers_) {
samplers.emplace_back(p.first, p.second.get());
}
}
for (const auto& p : samplers) {
auto* sampler = p.second;
RpczMethodPB* method_pb = resp->add_methods();
// TODO: use the actual RPC name instead of the request type name.
// Currently this isn't conveniently plumbed here, but the type name
// is close enough.
method_pb->set_method_name(p.first->req_prototype->GetTypeName());
sampler->GetSamplePBs(method_pb);
}
}
void RpczStore::LogTrace(InboundCall* call) {
int duration_ms = call->timing().TotalDuration().ToMilliseconds();
if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) {
double log_threshold = call->header_.timeout_millis() * 0.75f;
if (duration_ms > log_threshold) {
// TODO: consider pushing this onto another thread since it may be slow.
// The traces may also be too large to fit in a log message.
LOG(WARNING) << call->ToString() << " took " << duration_ms << "ms (client timeout "
<< call->header_.timeout_millis() << ").";
string s = call->trace()->DumpToString();
if (!s.empty()) {
LOG(WARNING) << "Trace:\n" << s;
}
return;
}
}
if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) {
LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:";
call->trace()->Dump(&LOG(INFO), true);
} else if (duration_ms > FLAGS_rpc_duration_too_long_ms) {
LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. "
<< "Request Metrics: " << call->trace()->MetricsAsJSON() << "\n";
string s = call->trace()->DumpToString();
if (!s.empty()) {
LOG(INFO) << "Trace:\n" << s;
}
}
}
} // namespace rpc
} // namespace kudu