| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "rpc/rpc-trace.h" |
| |
| #include <boost/bind.hpp> |
| #include <boost/scoped_ptr.hpp> |
| #include <gutil/strings/substitute.h> |
| |
| #include "common/logging.h" |
| #include "rpc/rpc-mgr.h" |
| #include "util/debug-util.h" |
| #include "util/histogram-metric.h" |
| #include "util/pretty-printer.h" |
| #include "util/time.h" |
| #include "util/webserver.h" |
| |
| #include "common/names.h" |
| |
| using namespace impala; |
| using namespace rapidjson; |
| using namespace strings; |
| |
| // Metric key format for rpc call duration metrics. |
| const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.call_duration"; |
| |
| // Singleton class to keep track of all RpcEventHandlers, and to render them to a |
| // web-based summary page. |
| class RpcEventHandlerManager { |
| public: |
| RpcEventHandlerManager(RpcMgr* rpc_mgr) : rpc_mgr_(rpc_mgr) {} |
| |
| // Adds an event handler to the list of those tracked |
| void RegisterEventHandler(RpcEventHandler* event_handler); |
| |
| // Produces Json for /rpcz with summary information for all Rpc methods. |
| // { "servers": [ |
| // .. list of output from RpcEventHandler::ToJson() |
| // ] } |
| void JsonCallback(const Webserver::WebRequest& req, Document* document); |
| |
| // Resets method statistics. Takes two optional arguments: 'server' and 'method'. If |
| // neither are specified, all server statistics are reset. If only the former is |
| // specified, all statistics for that server are reset. If both arguments are present, |
| // resets the statistics for a single method only. Produces no JSON output. |
| void ResetCallback(const Webserver::WebRequest& req, Document* document); |
| |
| private: |
| // Protects event_handlers_ |
| mutex lock_; |
| |
| // List of all event handlers in the system - once an event handler is registered, it |
| // should never be deleted. Event handlers are owned by the TProcessor which calls them, |
| // which are themselves owned by a ThriftServer. Since we do not terminate ThriftServers |
| // after they are started, event handlers have a lifetime equivalent to the length of |
| // the process. |
| vector<RpcEventHandler*> event_handlers_; |
| |
| // Points to an RpcMgr. If this is not null, then its metrics will be included in the |
| // output of JsonCallback. Not owned, but the object must be guaranteed to live as long |
| // as the process lives. |
| RpcMgr* rpc_mgr_ = nullptr; |
| }; |
| |
| // Only instance of RpcEventHandlerManager |
| scoped_ptr<RpcEventHandlerManager> handler_manager; |
| |
| void impala::InitRpcEventTracing(Webserver* webserver, RpcMgr* rpc_mgr) { |
| handler_manager.reset(new RpcEventHandlerManager(rpc_mgr)); |
| if (webserver != nullptr) { |
| Webserver::UrlCallback json = bind<void>( |
| mem_fn(&RpcEventHandlerManager::JsonCallback), handler_manager.get(), _1, _2); |
| webserver->RegisterUrlCallback("/rpcz", "rpcz.tmpl", json, true); |
| |
| Webserver::UrlCallback reset = bind<void>( |
| mem_fn(&RpcEventHandlerManager::ResetCallback), handler_manager.get(), _1, _2); |
| webserver->RegisterUrlCallback("/rpcz_reset", "", reset, false); |
| } |
| } |
| |
| void RpcEventHandlerManager::RegisterEventHandler(RpcEventHandler* event_handler) { |
| DCHECK(event_handler != nullptr); |
| lock_guard<mutex> l(lock_); |
| event_handlers_.push_back(event_handler); |
| } |
| |
| void RpcEventHandlerManager::JsonCallback(const Webserver::WebRequest& req, |
| Document* document) { |
| lock_guard<mutex> l(lock_); |
| Value servers(kArrayType); |
| for (RpcEventHandler* handler: event_handlers_) { |
| Value server(kObjectType); |
| handler->ToJson(&server, document); |
| servers.PushBack(server, document->GetAllocator()); |
| } |
| document->AddMember("servers", servers, document->GetAllocator()); |
| if (rpc_mgr_ != nullptr) rpc_mgr_->ToJson(document); |
| } |
| |
| void RpcEventHandlerManager::ResetCallback(const Webserver::WebRequest& req, |
| Document* document) { |
| const auto& args = req.parsed_args; |
| Webserver::ArgumentMap::const_iterator server_it = args.find("server"); |
| bool reset_all_servers = (server_it == args.end()); |
| Webserver::ArgumentMap::const_iterator method_it = args.find("method"); |
| bool reset_all_in_server = (method_it == args.end()); |
| lock_guard<mutex> l(lock_); |
| for (RpcEventHandler* handler: event_handlers_) { |
| if (reset_all_servers || handler->server_name() == server_it->second) { |
| if (reset_all_in_server) { |
| handler->ResetAll(); |
| } else { |
| handler->Reset(method_it->second); |
| } |
| if (!reset_all_servers) return; |
| } |
| } |
| } |
| |
| void RpcEventHandler::Reset(const string& method_name) { |
| lock_guard<mutex> l(method_map_lock_); |
| MethodMap::iterator it = method_map_.find(method_name); |
| if (it == method_map_.end()) return; |
| it->second->processing_time_distribution->Reset(); |
| it->second->num_in_flight.Store(0L); |
| } |
| |
| void RpcEventHandler::ResetAll() { |
| lock_guard<mutex> l(method_map_lock_); |
| for (const MethodMap::value_type& method: method_map_) { |
| method.second->processing_time_distribution->Reset(); |
| method.second->num_in_flight.Store(0L); |
| } |
| } |
| |
| RpcEventHandler::RpcEventHandler(const string& server_name, MetricGroup* metrics) : |
| server_name_(server_name), metrics_(metrics) { |
| if (handler_manager.get() != nullptr) handler_manager->RegisterEventHandler(this); |
| } |
| |
| void RpcEventHandler::ToJson(Value* server, Document* document) { |
| lock_guard<mutex> l(method_map_lock_); |
| Value name(server_name_.c_str(), document->GetAllocator()); |
| server->AddMember("name", name, document->GetAllocator()); |
| Value methods(kArrayType); |
| for (const MethodMap::value_type& rpc: method_map_) { |
| Value method(kObjectType); |
| Value method_name(rpc.first.c_str(), document->GetAllocator()); |
| method.AddMember("name", method_name, document->GetAllocator()); |
| const string& human_readable = |
| rpc.second->processing_time_distribution->ToHumanReadable(); |
| Value summary(human_readable.c_str(), document->GetAllocator()); |
| method.AddMember("summary", summary, document->GetAllocator()); |
| method.AddMember("in_flight", rpc.second->num_in_flight.Load(), |
| document->GetAllocator()); |
| Value server_name(server_name_.c_str(), document->GetAllocator()); |
| method.AddMember("server_name", server_name, document->GetAllocator()); |
| methods.PushBack(method, document->GetAllocator()); |
| } |
| server->AddMember("methods", methods, document->GetAllocator()); |
| } |
| |
| void* RpcEventHandler::getContext(const char* fn_name, void* server_context) { |
| ThriftServer::ConnectionContext* cnxn_ctx = |
| reinterpret_cast<ThriftServer::ConnectionContext*>(server_context); |
| MethodMap::iterator it; |
| { |
| lock_guard<mutex> l(method_map_lock_); |
| it = method_map_.find(fn_name); |
| if (it == method_map_.end()) { |
| MethodDescriptor* descriptor = new MethodDescriptor(); |
| descriptor->name = fn_name; |
| const string& rpc_name = Substitute(RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY, |
| Substitute("$0.$1", server_name_, descriptor->name)); |
| const TMetricDef& def = |
| MakeTMetricDef(rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_MS); |
| constexpr int32_t SIXTY_MINUTES_IN_MS = 60 * 1000 * 60; |
| // Store processing times of up to 60 minutes with 3 sig. fig. |
| descriptor->processing_time_distribution = |
| metrics_->RegisterMetric(new HistogramMetric(def, SIXTY_MINUTES_IN_MS, 3)); |
| it = method_map_.insert(make_pair(descriptor->name, descriptor)).first; |
| } |
| } |
| it->second->num_in_flight.Add(1); |
| // TODO: Consider pooling these |
| InvocationContext* ctxt_ptr = |
| new InvocationContext(MonotonicMillis(), cnxn_ctx, it->second); |
| VLOG_RPC << "RPC call: " << string(fn_name) << "(from " |
| << TNetworkAddressToString(ctxt_ptr->cnxn_ctx->network_address) << ")"; |
| return reinterpret_cast<void*>(ctxt_ptr); |
| } |
| |
| void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t bytes) { |
| InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx); |
| int64_t elapsed_time = MonotonicMillis() - rpc_ctx->start_time_ms; |
| const string& call_name = string(fn_name); |
| // TODO: bytes is always 0, how come? |
| VLOG_RPC << "RPC call: " << server_name_ << ":" << call_name << " from " |
| << TNetworkAddressToString(rpc_ctx->cnxn_ctx->network_address) << " took " |
| << PrettyPrinter::Print(elapsed_time * 1000L * 1000L, TUnit::TIME_NS); |
| MethodDescriptor* descriptor = rpc_ctx->method_descriptor; |
| delete rpc_ctx; |
| descriptor->num_in_flight.Add(-1); |
| descriptor->processing_time_distribution->Update(elapsed_time); |
| } |