// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/rpc/rpc_context.h"

#include <memory>
#include <ostream>
#include <utility>

#include <glog/logging.h>
#include <google/protobuf/message.h>

#include "kudu/rpc/connection.h"
#include "kudu/rpc/inbound_call.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/debug/trace_event.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/trace.h"

using google::protobuf::Message;
using kudu::pb_util::SecureDebugString;
using std::string;
using std::unique_ptr;

namespace kudu {

class Slice;

namespace rpc {

RpcContext::RpcContext(InboundCall *call,
                       const google::protobuf::Message *request_pb,
                       google::protobuf::Message *response_pb)
  : call_(CHECK_NOTNULL(call)),
    request_pb_(request_pb),
    response_pb_(response_pb) {
  VLOG(4) << call_->remote_method().service_name() << ": Received RPC request for "
          << call_->ToString() << ":" << std::endl << SecureDebugString(*request_pb_);
  TRACE_EVENT_ASYNC_BEGIN2("rpc_call", "RPC", this,
                           "call", call_->ToString(),
                           "request", pb_util::PbTracer::TracePb(*request_pb_));
}

RpcContext::~RpcContext() {
}

void RpcContext::SetResultTracker(scoped_refptr<ResultTracker> result_tracker) {
  DCHECK(!result_tracker_);
  result_tracker_ = std::move(result_tracker);
}

void RpcContext::RespondSuccess() {
  if (AreResultsTracked()) {
    result_tracker_->RecordCompletionAndRespond(call_->header().request_id(),
                                                response_pb_);
  } else {
    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC success response for "
        << call_->ToString() << ":" << std::endl << SecureDebugString(*response_pb_);
    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
                           "response", pb_util::PbTracer::TracePb(*response_pb_),
                           "trace", trace()->DumpToString());
    call_->RespondSuccess(*response_pb_);
    delete this;
  }
}

void RpcContext::RespondNoCache() {
  if (AreResultsTracked()) {
    result_tracker_->FailAndRespond(call_->header().request_id(),
                                    response_pb_);
  } else {
    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
        << call_->ToString() << ": " << SecureDebugString(*response_pb_);
    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
                           "response", pb_util::PbTracer::TracePb(*response_pb_),
                           "trace", trace()->DumpToString());
    // This is a bit counter intuitive, but when we get the failure but set the error on the
    // call's response we call RespondSuccess() instead of RespondFailure().
    call_->RespondSuccess(*response_pb_);
    delete this;
  }
}

void RpcContext::RespondFailure(const Status &status) {
  return RespondRpcFailure(ErrorStatusPB::ERROR_APPLICATION, status);
}

void RpcContext::RespondRpcFailure(ErrorStatusPB_RpcErrorCodePB err, const Status& status) {
  if (AreResultsTracked()) {
    result_tracker_->FailAndRespond(call_->header().request_id(),
                                    err, status);
  } else {
    VLOG(4) << call_->remote_method().service_name() << ": Sending RPC failure response for "
        << call_->ToString() << ": " << status.ToString();
    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
                           "status", status.ToString(),
                           "trace", trace()->DumpToString());
    call_->RespondFailure(err, status);
    delete this;
  }
}

void RpcContext::RespondApplicationError(int error_ext_id, const std::string& message,
                                         const Message& app_error_pb) {
  if (AreResultsTracked()) {
    result_tracker_->FailAndRespond(call_->header().request_id(),
                                    error_ext_id, message, app_error_pb);
  } else {
    if (VLOG_IS_ON(4)) {
      ErrorStatusPB err;
      InboundCall::ApplicationErrorToPB(error_ext_id, message, app_error_pb, &err);
      VLOG(4) << call_->remote_method().service_name()
          << ": Sending application error response for " << call_->ToString()
          << ":" << std::endl << SecureDebugString(err);
    }
    TRACE_EVENT_ASYNC_END2("rpc_call", "RPC", this,
                           "response", pb_util::PbTracer::TracePb(app_error_pb),
                           "trace", trace()->DumpToString());
    call_->RespondApplicationError(error_ext_id, message, app_error_pb);
    delete this;
  }
}

const rpc::RequestIdPB* RpcContext::request_id() const {
  return call_->header().has_request_id() ? &call_->header().request_id() : nullptr;
}

size_t RpcContext::GetTransferSize() const {
  return call_->GetTransferSize();
}

Status RpcContext::AddOutboundSidecar(unique_ptr<RpcSidecar> car, int* idx) {
  return call_->AddOutboundSidecar(std::move(car), idx);
}

Status RpcContext::GetInboundSidecar(int idx, Slice* slice) const {
  return call_->GetInboundSidecar(idx, slice);
}

const RemoteUser& RpcContext::remote_user() const {
  return call_->remote_user();
}

bool RpcContext::is_confidential() const {
  return call_->connection()->is_confidential();
}

void RpcContext::DiscardTransfer() {
  call_->DiscardTransfer();
}

const Sockaddr& RpcContext::remote_address() const {
  return call_->remote_address();
}

std::string RpcContext::requestor_string() const {
  return call_->remote_user().ToString() + " at " +
    call_->remote_address().ToString();
}

const std::string& RpcContext::method_name() const {
  return call_->remote_method().method_name();
}

const std::string& RpcContext::service_name() const {
  return call_->remote_method().service_name();
}

MonoTime RpcContext::GetClientDeadline() const {
  return call_->GetClientDeadline();
}

MonoTime RpcContext::GetTimeReceived() const {
  return call_->GetTimeReceived();
}

MonoTime RpcContext::GetTimeHandled() const {
  return call_->GetTimeHandled();
}

Trace* RpcContext::trace() const {
  return call_->trace();
}

void RpcContext::Panic(const char* filepath, int line_number, const string& message) {
  // Use the LogMessage class directly so that the log messages appear to come from
  // the line of code which caused the panic, not this code.
#define MY_ERROR google::LogMessage(filepath, line_number, google::GLOG_ERROR).stream()
#define MY_FATAL google::LogMessageFatal(filepath, line_number).stream()

  MY_ERROR << "Panic handling " << call_->ToString() << ": " << message;
  MY_ERROR << "Request:\n" << SecureDebugString(*request_pb_);
  Trace* t = trace();
  if (t) {
    MY_ERROR << "RPC trace:";
    t->Dump(&MY_ERROR, true);
  }
  MY_FATAL << "Exiting due to panic.";

#undef MY_ERROR
#undef MY_FATAL
}


} // namespace rpc
} // namespace kudu
