| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/rpc/rpcz_store.h" |
| |
| #include <algorithm> // IWYU pragma: keep |
| #include <array> |
| #include <cstdint> |
| #include <mutex> // for unique_lock |
| #include <ostream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| #include <google/protobuf/message.h> |
| |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/stringpiece.h" |
| #include "kudu/gutil/walltime.h" |
| #include "kudu/rpc/inbound_call.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #include "kudu/rpc/rpc_introspection.pb.h" |
| #include "kudu/rpc/service_if.h" |
| #include "kudu/util/atomic.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/trace.h" |
| #include "kudu/util/trace_metrics.h" |
| |
| DEFINE_bool(rpc_dump_all_traces, false, |
| "If true, dump all RPC traces at INFO level"); |
| TAG_FLAG(rpc_dump_all_traces, advanced); |
| TAG_FLAG(rpc_dump_all_traces, runtime); |
| |
| DEFINE_int32(rpc_duration_too_long_ms, 1000, |
| "Threshold (in milliseconds) above which a RPC is considered too long and its " |
| "duration and method name are logged at INFO level. The time measured is between " |
| "when a RPC is accepted and when its call handler completes."); |
| TAG_FLAG(rpc_duration_too_long_ms, advanced); |
| TAG_FLAG(rpc_duration_too_long_ms, runtime); |
| |
| using std::pair; |
| using std::string; |
| using std::vector; |
| using std::unique_ptr; |
| |
| namespace kudu { |
| namespace rpc { |
| |
| // Sample an RPC call once every N milliseconds within each |
| // bucket. If the current sample in a latency bucket is older |
| // than this threshold, a new sample will be taken. |
| static const int kSampleIntervalMs = 1000; |
| |
| static const int kBucketThresholdsMs[] = {10, 100, 1000}; |
| static constexpr int kNumBuckets = arraysize(kBucketThresholdsMs) + 1; |
| |
| // An instance of this class is created For each RPC method implemented |
| // on the server. It keeps several recent samples for each RPC, currently |
| // based on fixed time buckets. |
| class MethodSampler { |
| public: |
| MethodSampler() {} |
| ~MethodSampler() {} |
| |
| // Potentially sample a single call. |
| void SampleCall(InboundCall* call); |
| |
| // Dump the current samples. |
| void GetSamplePBs(RpczMethodPB* pb); |
| |
| private: |
| // Convert the trace metrics from 't' into protobuf entries in 'sample_pb'. |
| // This function recurses through the parent-child relationship graph, |
| // keeping the current tree path in 'child_path' (empty at the root). |
| static void GetTraceMetrics(const Trace& t, |
| const string& child_path, |
| RpczSamplePB* sample_pb); |
| |
| // An individual recorded sample. |
| struct Sample { |
| RequestHeader header; |
| scoped_refptr<Trace> trace; |
| int duration_ms; |
| }; |
| |
| // A sample, including the particular time at which it was |
| // sampled, and a lock protecting it. |
| struct SampleBucket { |
| SampleBucket() : last_sample_time(0) {} |
| |
| AtomicInt<int64_t> last_sample_time; |
| simple_spinlock sample_lock; |
| Sample sample; |
| }; |
| std::array<SampleBucket, kNumBuckets> buckets_; |
| |
| DISALLOW_COPY_AND_ASSIGN(MethodSampler); |
| }; |
| |
| MethodSampler* RpczStore::SamplerForCall(InboundCall* call) { |
| if (PREDICT_FALSE(!call->method_info())) { |
| return nullptr; |
| } |
| |
| // Most likely, we already have a sampler created for the call. |
| { |
| shared_lock<rw_spinlock> l(samplers_lock_.get_lock()); |
| auto it = method_samplers_.find(call->method_info()); |
| if (PREDICT_TRUE(it != method_samplers_.end())) { |
| return it->second.get(); |
| } |
| } |
| |
| // If missing, create a new sampler for this method and try to insert it. |
| unique_ptr<MethodSampler> ms(new MethodSampler()); |
| std::lock_guard<percpu_rwlock> lock(samplers_lock_); |
| auto it = method_samplers_.find(call->method_info()); |
| if (it != method_samplers_.end()) { |
| return it->second.get(); |
| } |
| auto* ret = ms.get(); |
| method_samplers_[call->method_info()] = std::move(ms); |
| return ret; |
| } |
| |
| void MethodSampler::SampleCall(InboundCall* call) { |
| // First determine which sample bucket to put this in. |
| int duration_ms = call->timing().TotalDuration().ToMilliseconds(); |
| |
| SampleBucket* bucket = &buckets_[kNumBuckets - 1]; |
| for (int i = 0 ; i < kNumBuckets - 1; i++) { |
| if (duration_ms < kBucketThresholdsMs[i]) { |
| bucket = &buckets_[i]; |
| break; |
| } |
| } |
| |
| MicrosecondsInt64 now = GetMonoTimeMicros(); |
| int64_t us_since_trace = now - bucket->last_sample_time.Load(); |
| if (us_since_trace > kSampleIntervalMs * 1000) { |
| Sample new_sample = {call->header(), call->trace(), duration_ms}; |
| { |
| std::unique_lock<simple_spinlock> lock(bucket->sample_lock, std::try_to_lock); |
| // If another thread is already taking a sample, it's not worth waiting. |
| if (!lock.owns_lock()) { |
| return; |
| } |
| std::swap(bucket->sample, new_sample); |
| bucket->last_sample_time.Store(now); |
| } |
| VLOG(2) << "Sampled call " << call->ToString(); |
| } |
| } |
| |
| void MethodSampler::GetTraceMetrics(const Trace& t, |
| const string& child_path, |
| RpczSamplePB* sample_pb) { |
| auto m = t.metrics().Get(); |
| for (const auto& e : m) { |
| auto* pb = sample_pb->add_metrics(); |
| pb->set_key(e.first); |
| pb->set_value(e.second); |
| if (!child_path.empty()) { |
| pb->set_child_path(child_path); |
| } |
| } |
| |
| for (const auto& child_pair : t.ChildTraces()) { |
| string path = child_path; |
| if (!path.empty()) { |
| path += "."; |
| } |
| path += child_pair.first.ToString(); |
| GetTraceMetrics(*child_pair.second.get(), path, sample_pb); |
| } |
| } |
| |
| void MethodSampler::GetSamplePBs(RpczMethodPB* method_pb) { |
| for (auto& bucket : buckets_) { |
| if (bucket.last_sample_time.Load() == 0) continue; |
| |
| std::unique_lock<simple_spinlock> lock(bucket.sample_lock); |
| auto* sample_pb = method_pb->add_samples(); |
| sample_pb->mutable_header()->CopyFrom(bucket.sample.header); |
| sample_pb->set_trace(bucket.sample.trace->DumpToString(Trace::INCLUDE_TIME_DELTAS)); |
| |
| GetTraceMetrics(*bucket.sample.trace.get(), "", sample_pb); |
| sample_pb->set_duration_ms(bucket.sample.duration_ms); |
| } |
| } |
| |
| RpczStore::RpczStore() {} |
| RpczStore::~RpczStore() {} |
| |
| void RpczStore::AddCall(InboundCall* call) { |
| LogTrace(call); |
| auto* sampler = SamplerForCall(call); |
| if (PREDICT_FALSE(!sampler)) return; |
| |
| sampler->SampleCall(call); |
| } |
| |
| void RpczStore::DumpPB(const DumpRpczStoreRequestPB& req, |
| DumpRpczStoreResponsePB* resp) { |
| vector<pair<RpcMethodInfo*, MethodSampler*>> samplers; |
| { |
| shared_lock<rw_spinlock> l(samplers_lock_.get_lock()); |
| for (const auto& p : method_samplers_) { |
| samplers.emplace_back(p.first, p.second.get()); |
| } |
| } |
| |
| for (const auto& p : samplers) { |
| auto* sampler = p.second; |
| |
| RpczMethodPB* method_pb = resp->add_methods(); |
| // TODO: use the actual RPC name instead of the request type name. |
| // Currently this isn't conveniently plumbed here, but the type name |
| // is close enough. |
| method_pb->set_method_name(p.first->req_prototype->GetTypeName()); |
| sampler->GetSamplePBs(method_pb); |
| } |
| } |
| |
| void RpczStore::LogTrace(InboundCall* call) { |
| int duration_ms = call->timing().TotalDuration().ToMilliseconds(); |
| |
| if (call->header_.has_timeout_millis() && call->header_.timeout_millis() > 0) { |
| double log_threshold = call->header_.timeout_millis() * 0.75f; |
| if (duration_ms > log_threshold) { |
| // TODO: consider pushing this onto another thread since it may be slow. |
| // The traces may also be too large to fit in a log message. |
| LOG(WARNING) << call->ToString() << " took " << duration_ms << "ms (client timeout " |
| << call->header_.timeout_millis() << ")."; |
| string s = call->trace()->DumpToString(); |
| if (!s.empty()) { |
| LOG(WARNING) << "Trace:\n" << s; |
| } |
| return; |
| } |
| } |
| |
| if (PREDICT_FALSE(FLAGS_rpc_dump_all_traces)) { |
| LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. Trace:"; |
| call->trace()->Dump(&LOG(INFO), true); |
| } else if (duration_ms > FLAGS_rpc_duration_too_long_ms) { |
| LOG(INFO) << call->ToString() << " took " << duration_ms << "ms. " |
| << "Request Metrics: " << call->trace()->MetricsAsJSON() << "\n"; |
| string s = call->trace()->DumpToString(); |
| if (!s.empty()) { |
| LOG(INFO) << "Trace:\n" << s; |
| } |
| } |
| } |
| |
| |
| } // namespace rpc |
| } // namespace kudu |