// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "rpc/rpc-trace.h"

#include <boost/bind.hpp>
#include <boost/scoped_ptr.hpp>
#include <gutil/strings/substitute.h>

#include "common/logging.h"
#include "rpc/rpc-mgr.h"
#include "util/debug-util.h"
#include "util/histogram-metric.h"
#include "util/pretty-printer.h"
#include "util/time.h"
#include "util/webserver.h"

#include "common/names.h"

using namespace impala;
using namespace rapidjson;
using namespace strings;

// Metric key format for rpc call duration metrics.
const string RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY = "rpc-method.$0.call_duration";

// Singleton class to keep track of all RpcEventHandlers, and to render them to a
// web-based summary page.
class RpcEventHandlerManager {
 public:
  RpcEventHandlerManager(RpcMgr* rpc_mgr) : rpc_mgr_(rpc_mgr) {}

  // Adds an event handler to the list of those tracked
  void RegisterEventHandler(RpcEventHandler* event_handler);

  // Produces Json for /rpcz with summary information for all Rpc methods.
  // { "servers": [
  //  .. list of output from RpcEventHandler::ToJson()
  //  ] }
  void JsonCallback(const Webserver::WebRequest& req, Document* document);

  // Resets method statistics. Takes two optional arguments: 'server' and 'method'. If
  // neither are specified, all server statistics are reset. If only the former is
  // specified, all statistics for that server are reset. If both arguments are present,
  // resets the statistics for a single method only. Produces no JSON output.
  void ResetCallback(const Webserver::WebRequest& req, Document* document);

 private:
  // Protects event_handlers_
  mutex lock_;

  // List of all event handlers in the system - once an event handler is registered, it
  // should never be deleted. Event handlers are owned by the TProcessor which calls them,
  // which are themselves owned by a ThriftServer. Since we do not terminate ThriftServers
  // after they are started, event handlers have a lifetime equivalent to the length of
  // the process.
  vector<RpcEventHandler*> event_handlers_;

  // Points to an RpcMgr. If this is not null, then its metrics will be included in the
  // output of JsonCallback. Not owned, but the object must be guaranteed to live as long
  // as the process lives.
  RpcMgr* rpc_mgr_ = nullptr;
};

// Only instance of RpcEventHandlerManager
scoped_ptr<RpcEventHandlerManager> handler_manager;

void impala::InitRpcEventTracing(Webserver* webserver, RpcMgr* rpc_mgr) {
  handler_manager.reset(new RpcEventHandlerManager(rpc_mgr));
  if (webserver != nullptr) {
    Webserver::UrlCallback json = bind<void>(
        mem_fn(&RpcEventHandlerManager::JsonCallback), handler_manager.get(), _1, _2);
    webserver->RegisterUrlCallback("/rpcz", "rpcz.tmpl", json, true);

    Webserver::UrlCallback reset = bind<void>(
        mem_fn(&RpcEventHandlerManager::ResetCallback), handler_manager.get(), _1, _2);
    webserver->RegisterUrlCallback("/rpcz_reset", "", reset, false);
  }
}

void RpcEventHandlerManager::RegisterEventHandler(RpcEventHandler* event_handler) {
  DCHECK(event_handler != nullptr);
  lock_guard<mutex> l(lock_);
  event_handlers_.push_back(event_handler);
}

void RpcEventHandlerManager::JsonCallback(const Webserver::WebRequest& req,
    Document* document) {
  lock_guard<mutex> l(lock_);
  Value servers(kArrayType);
  for (RpcEventHandler* handler: event_handlers_) {
    Value server(kObjectType);
    handler->ToJson(&server, document);
    servers.PushBack(server, document->GetAllocator());
  }
  document->AddMember("servers", servers, document->GetAllocator());
  if (rpc_mgr_ != nullptr) rpc_mgr_->ToJson(document);
}

void RpcEventHandlerManager::ResetCallback(const Webserver::WebRequest& req,
    Document* document) {
  const auto& args = req.parsed_args;
  Webserver::ArgumentMap::const_iterator server_it = args.find("server");
  bool reset_all_servers = (server_it == args.end());
  Webserver::ArgumentMap::const_iterator method_it = args.find("method");
  bool reset_all_in_server = (method_it == args.end());
  lock_guard<mutex> l(lock_);
  for (RpcEventHandler* handler: event_handlers_) {
    if (reset_all_servers || handler->server_name() == server_it->second) {
      if (reset_all_in_server) {
        handler->ResetAll();
      } else {
        handler->Reset(method_it->second);
      }
      if (!reset_all_servers) return;
    }
  }
}

void RpcEventHandler::Reset(const string& method_name) {
  lock_guard<mutex> l(method_map_lock_);
  MethodMap::iterator it = method_map_.find(method_name);
  if (it == method_map_.end()) return;
  it->second->processing_time_distribution->Reset();
  it->second->num_in_flight.Store(0L);
}

void RpcEventHandler::ResetAll() {
  lock_guard<mutex> l(method_map_lock_);
  for (const MethodMap::value_type& method: method_map_) {
    method.second->processing_time_distribution->Reset();
    method.second->num_in_flight.Store(0L);
  }
}

RpcEventHandler::RpcEventHandler(const string& server_name, MetricGroup* metrics) :
    server_name_(server_name), metrics_(metrics) {
  if (handler_manager.get() != nullptr) handler_manager->RegisterEventHandler(this);
}

void RpcEventHandler::ToJson(Value* server, Document* document) {
  lock_guard<mutex> l(method_map_lock_);
  Value name(server_name_.c_str(), document->GetAllocator());
  server->AddMember("name", name, document->GetAllocator());
  Value methods(kArrayType);
  for (const MethodMap::value_type& rpc: method_map_) {
    Value method(kObjectType);
    Value method_name(rpc.first.c_str(), document->GetAllocator());
    method.AddMember("name", method_name, document->GetAllocator());
    const string& human_readable =
        rpc.second->processing_time_distribution->ToHumanReadable();
    Value summary(human_readable.c_str(), document->GetAllocator());
    method.AddMember("summary", summary, document->GetAllocator());
    method.AddMember("in_flight", rpc.second->num_in_flight.Load(),
        document->GetAllocator());
    Value server_name(server_name_.c_str(), document->GetAllocator());
    method.AddMember("server_name", server_name, document->GetAllocator());
    methods.PushBack(method, document->GetAllocator());
  }
  server->AddMember("methods", methods, document->GetAllocator());
}

void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
  ThriftServer::ConnectionContext* cnxn_ctx =
      reinterpret_cast<ThriftServer::ConnectionContext*>(server_context);
  MethodMap::iterator it;
  {
    lock_guard<mutex> l(method_map_lock_);
    it = method_map_.find(fn_name);
    if (it == method_map_.end()) {
      MethodDescriptor* descriptor = new MethodDescriptor();
      descriptor->name = fn_name;
      const string& rpc_name = Substitute(RPC_PROCESSING_TIME_DISTRIBUTION_METRIC_KEY,
          Substitute("$0.$1", server_name_, descriptor->name));
      const TMetricDef& def =
          MakeTMetricDef(rpc_name, TMetricKind::HISTOGRAM, TUnit::TIME_MS);
      constexpr int32_t SIXTY_MINUTES_IN_MS = 60 * 1000 * 60;
      // Store processing times of up to 60 minutes with 3 sig. fig.
      descriptor->processing_time_distribution =
          metrics_->RegisterMetric(new HistogramMetric(def, SIXTY_MINUTES_IN_MS, 3));
      it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
    }
  }
  it->second->num_in_flight.Add(1);
  // TODO: Consider pooling these
  InvocationContext* ctxt_ptr =
      new InvocationContext(MonotonicMillis(), cnxn_ctx, it->second);
  VLOG_RPC << "RPC call: " << string(fn_name) << "(from "
           << TNetworkAddressToString(ctxt_ptr->cnxn_ctx->network_address) << ")";
  return reinterpret_cast<void*>(ctxt_ptr);
}

void RpcEventHandler::postWrite(void* ctx, const char* fn_name, uint32_t bytes) {
  InvocationContext* rpc_ctx = reinterpret_cast<InvocationContext*>(ctx);
  int64_t elapsed_time = MonotonicMillis() - rpc_ctx->start_time_ms;
  const string& call_name = string(fn_name);
  // TODO: bytes is always 0, how come?
  VLOG_RPC << "RPC call: " << server_name_ << ":" << call_name << " from "
           << TNetworkAddressToString(rpc_ctx->cnxn_ctx->network_address) << " took "
           << PrettyPrinter::Print(elapsed_time * 1000L * 1000L, TUnit::TIME_NS);
  MethodDescriptor* descriptor = rpc_ctx->method_descriptor;
  delete rpc_ctx;
  descriptor->num_in_flight.Add(-1);
  descriptor->processing_time_distribution->Update(elapsed_time);
}
