blob: 146e78f8d8186a5777e4eb21b7390c7920bfec77 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "rpc/rpc-trace.h"
#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "rpc/rpc-mgr.h"
#include "util/debug-util.h"
#include "util/histogram-metric.h"
#include "util/pretty-printer.h"
#include "util/time.h"
#include "util/webserver.h"
#include "common/names.h"
using namespace impala;
using namespace rapidjson;
using namespace strings;
// Metric key format for rpc call duration metrics.
const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.call_duration";
// Metric key format for rpc read metrics.
const string RPC_READ_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.read_duration";
// Metric key format for rpc write metrics.
const string RPC_WRITE_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.write_duration";
// Singleton class to keep track of all RpcEventHandlers, and to render them to a
// web-based summary page.
class RpcEventHandlerManager {
public:
RpcEventHandlerManager(RpcMgr* rpc_mgr) : rpc_mgr_(rpc_mgr) {}
// Adds an event handler to the list of those tracked
void RegisterEventHandler(RpcEventHandler* event_handler);
// Produces Json for /rpcz with summary information for all Rpc methods.
// { "servers": [
// .. list of output from RpcEventHandler::ToJson()
// ] }
void JsonCallback(const Webserver::WebRequest& req, Document* document);
// Resets method statistics. Takes two optional arguments: 'server' and 'method'. If
// neither are specified, all server statistics are reset. If only the former is
// specified, all statistics for that server are reset. If both arguments are present,
// resets the statistics for a single method only. Produces no JSON output.
void ResetCallback(const Webserver::WebRequest& req, Document* document);
private:
// Protects event_handlers_
mutex lock_;
// List of all event handlers in the system - once an event handler is registered, it
// should never be deleted. Event handlers are owned by the TProcessor which calls them,
// which are themselves owned by a ThriftServer. Since we do not terminate ThriftServers
// after they are started, event handlers have a lifetime equivalent to the length of
// the process.
vector<RpcEventHandler*> event_handlers_;
// Points to an RpcMgr. If this is not null, then its metrics will be included in the
// output of JsonCallback. Not owned, but the object must be guaranteed to live as long
// as the process lives.
RpcMgr* rpc_mgr_ = nullptr;
};
// Only instance of RpcEventHandlerManager
scoped_ptr<RpcEventHandlerManager> handler_manager;
void impala::InitRpcEventTracing(Webserver* webserver, RpcMgr* rpc_mgr) {
handler_manager.reset(new RpcEventHandlerManager(rpc_mgr));
if (webserver != nullptr) {
Webserver::UrlCallback json = bind<void>(
mem_fn(&RpcEventHandlerManager::JsonCallback), handler_manager.get(), _1, _2);
webserver->RegisterUrlCallback("/rpcz", "rpcz.tmpl", json, true);
Webserver::UrlCallback reset = bind<void>(
mem_fn(&RpcEventHandlerManager::ResetCallback), handler_manager.get(), _1, _2);
webserver->RegisterUrlCallback("/rpcz_reset", "", reset, false);
}
}
void RpcEventHandlerManager::RegisterEventHandler(RpcEventHandler* event_handler) {
DCHECK(event_handler != nullptr);
lock_guard<mutex> l(lock_);
event_handlers_.push_back(event_handler);
}
void RpcEventHandlerManager::JsonCallback(const Webserver::WebRequest& req,
Document* document) {
lock_guard<mutex> l(lock_);
Value servers(kArrayType);
for (RpcEventHandler* handler: event_handlers_) {
Value server(kObjectType);
handler->ToJson(&server, document);
servers.PushBack(server, document->GetAllocator());
}
document->AddMember("servers", servers, document->GetAllocator());
if (rpc_mgr_ != nullptr) rpc_mgr_->ToJson(document);
}
void RpcEventHandlerManager::ResetCallback(const Webserver::WebRequest& req,
Document* document) {
const auto& args = req.parsed_args;
Webserver::ArgumentMap::const_iterator server_it = args.find("server");
bool reset_all_servers = (server_it == args.end());
Webserver::ArgumentMap::const_iterator method_it = args.find("method");
bool reset_all_in_server = (method_it == args.end());
lock_guard<mutex> l(lock_);
for (RpcEventHandler* handler: event_handlers_) {
if (reset_all_servers || handler->server_name() == server_it->second) {
if (reset_all_in_server) {
handler->ResetAll();
} else {
handler->Reset(method_it->second);
}
if (!reset_all_servers) return;
}
}
}
void RpcEventHandler::Reset(const string& method_name) {
lock_guard<mutex> l(method_map_lock_);
MethodMap::iterator it = method_map_.find(method_name);
if (it == method_map_.end()) return;
it->second->processing_time_distribution->Reset();
it->second->read_time_distribution->Reset();
it->second->write_time_distribution->Reset();
it->second->num_in_flight.Store(0L);
}
void RpcEventHandler::ResetAll() {
lock_guard<mutex> l(method_map_lock_);
for (const MethodMap::value_type& method: method_map_) {
method.second->processing_time_distribution->Reset();
method.second->read_time_distribution->Reset();
method.second->write_time_distribution->Reset();
method.second->num_in_flight.Store(0L);
}
}
RpcEventHandler::RpcEventHandler(
const string& server_name, MetricGroup* metrics, int vlog_level)
: server_name_(server_name), metrics_(metrics), vlog_level_(vlog_level) {
if (handler_manager.get() != nullptr) handler_manager->RegisterEventHandler(this);
}
void RpcEventHandler::ToJson(Value* server, Document* document) {
lock_guard<mutex> l(method_map_lock_);
Value name(server_name_.c_str(), document->GetAllocator());
server->AddMember("name", name, document->GetAllocator());
Value methods(kArrayType);
for (const MethodMap::value_type& rpc: method_map_) {
Value method(kObjectType);
Value method_name(rpc.first.c_str(), document->GetAllocator());
method.AddMember("name", method_name, document->GetAllocator());
const string& human_readable =
rpc.second->processing_time_distribution->ToHumanReadable();
Value summary(human_readable.c_str(), document->GetAllocator());
method.AddMember("summary", summary, document->GetAllocator());
const string& human_readable_read =
rpc.second->read_time_distribution->ToHumanReadable();
Value read_stats(human_readable_read.c_str(), document->GetAllocator());
method.AddMember("read", read_stats, document->GetAllocator());
const string& human_readable_write =
rpc.second->write_time_distribution->ToHumanReadable();
Value write_stats(human_readable_write.c_str(), document->GetAllocator());
method.AddMember("write", write_stats, document->GetAllocator());
method.AddMember("in_flight", rpc.second->num_in_flight.Load(),
document->GetAllocator());
Value server_name(server_name_.c_str(), document->GetAllocator());
method.AddMember("server_name", server_name, document->GetAllocator());
methods.PushBack(method, document->GetAllocator());
}
server->AddMember("methods", methods, document->GetAllocator());
}
void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
ThriftServer::ConnectionContext* cnxn_ctx =
reinterpret_cast<ThriftServer::ConnectionContext*>(server_context);
MethodMap::iterator it;
{
lock_guard<mutex> l(method_map_lock_);
it = method_map_.find(fn_name);
if (it == method_map_.end()) {
MethodDescriptor* descriptor = new MethodDescriptor();
descriptor->name = fn_name;
const string& rpc_name = Substitute(RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY,
Substitute("$0.$1", server_name_, descriptor->name));
const TMetricDef& def =
MakeTMetricDef(rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_US);
constexpr int64_t SIXTY_MINUTES_IN_US = 60LL * 1000000LL * 60LL;
// Store processing times of up to 60 minutes with 3 sig. fig.
descriptor->processing_time_distribution =
metrics_->RegisterMetric(new HistogramMetric(def, SIXTY_MINUTES_IN_US, 3));
const string& read_rpc_name = Substitute(RPC_READ_TIME_DISTRIBUTION_METRIC_KEY,
Substitute("$0.$1", server_name_, descriptor->name));
const TMetricDef& read_def =
MakeTMetricDef(read_rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_US);
descriptor->read_time_distribution =
metrics_->RegisterMetric(new HistogramMetric(read_def, SIXTY_MINUTES_IN_US, 3));
const string& write_rpc_name = Substitute(RPC_WRITE_TIME_DISTRIBUTION_METRIC_KEY,
Substitute("$0.$1", server_name_, descriptor->name));
const TMetricDef& write_def =
MakeTMetricDef(write_rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_US);
descriptor->write_time_distribution = metrics_->RegisterMetric(
new HistogramMetric(write_def, SIXTY_MINUTES_IN_US, 3));
it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
}
}
it->second->num_in_flight.Add(1);
// TODO: Consider pooling these
InvocationContext* ctxt_ptr =
new InvocationContext(GetMonoTimeMicros(), cnxn_ctx, it->second);
SetThreadRPCContext(ctxt_ptr);
VLOG(vlog_level_) << "RPC call: " << string(fn_name) << "(from "
<< TNetworkAddressToString(ctxt_ptr->cnxn_ctx->network_address)
<< ")";
return reinterpret_cast<void*>(ctxt_ptr);
}
void RpcEventHandler::freeContext(void* ctx, const char* /* fn_name */) {
InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
DCHECK(GetThreadRPCContext() == rpc_ctx);
SetThreadRPCContext(nullptr);
rpc_ctx->UnRegister();
}
// postWrite callback occurs after RPC write completes
void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
rpc_ctx->write_end_us = GetMonoTimeMicros();
const int64_t elapsed_time = rpc_ctx->write_end_us - rpc_ctx->start_time_us;
const int64_t write_time = rpc_ctx->write_end_us - rpc_ctx->write_start_us;
const string& call_name = string(fn_name);
// TODO: bytes is always 0 since TTransport does not track write count.
VLOG(vlog_level_) << "RPC call: " << server_name_ << ":" << call_name << " from "
<< TNetworkAddressToString(rpc_ctx->cnxn_ctx->network_address)
<< " took "
<< PrettyPrinter::Print(elapsed_time * 1000L, TUnit::TIME_NS);
MethodDescriptor* descriptor = rpc_ctx->method_descriptor;
descriptor->num_in_flight.Add(-1);
descriptor->processing_time_distribution->Update(elapsed_time);
descriptor->write_time_distribution->Update(write_time);
}
// preRead callback occurs before RPC read starts
void RpcEventHandler::preRead(void* ctx, const char* fn_name) {
InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
rpc_ctx->read_start_us = GetMonoTimeMicros();
}
// postRead callback occurs after RPC read completes
void RpcEventHandler::postRead(void* ctx, const char* fn_name, uint32_t bytes) {
InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
rpc_ctx->read_end_us = GetMonoTimeMicros();
int64_t elapsed_time = rpc_ctx->read_end_us - rpc_ctx->start_time_us;
rpc_ctx->method_descriptor->read_time_distribution->Update(elapsed_time);
}
// preWrite callback occurs before RPC read starts
void RpcEventHandler::preWrite(void* ctx, const char* fn_name) {
InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
rpc_ctx->write_start_us = GetMonoTimeMicros();
}
__thread RpcEventHandler::InvocationContext* __rpc_context__;
void RpcEventHandler::SetThreadRPCContext(RpcEventHandler::InvocationContext* ctxt_ptr) {
__rpc_context__ = ctxt_ptr;
}
RpcEventHandler::InvocationContext* RpcEventHandler::GetThreadRPCContext() {
return __rpc_context__;
}
void RpcEventHandler::InvocationContext::Register() {
DCHECK(refcnt_.Load() >= 1); // Should be registered with RpcEventHandler
refcnt_.Add(1);
}
void RpcEventHandler::InvocationContext::UnRegister() {
int32_t newCount = refcnt_.Add(-1);
DCHECK (newCount >= 0);
if (newCount == 0) {
delete this;
}
}
bool RpcEventHandler::InvocationContext::UnRegisterCompleted(
uint64_t& read_ns, uint64_t& write_ns) {
int32_t newCount = refcnt_.Load();
DCHECK (newCount >= 1);
if (newCount == 1) {
if (read_end_us > read_start_us) {
read_ns += ((read_end_us - read_start_us) * 1000L);
}
if (write_end_us > write_start_us) {
write_ns += ((write_end_us - write_start_us) * 1000L);
}
UnRegister();
return true;
}
return false;
}