// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef IMPALA_RPC_RPC_MGR_TEST_H
#define IMPALA_RPC_RPC_MGR_TEST_H

#include "common/init.h"
#include "exec/kudu-util.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/status.h"
#include "rpc/rpc-mgr.inline.h"
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "testutil/gtest-util.h"
#include "testutil/scoped-flag-setter.h"
#include "util/auth-util.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/test-info.h"

#include "gen-cpp/rpc_test.proxy.h"
#include "gen-cpp/rpc_test.service.h"
#include "gen-cpp/rpc_test.pb.h"

#include "common/names.h"

using kudu::rpc::GeneratedServiceIf;
using kudu::rpc::RemoteUser;
using kudu::rpc::RpcController;
using kudu::rpc::RpcContext;
using kudu::rpc::RpcSidecar;
using kudu::Slice;

using namespace std;

DECLARE_int32(num_reactor_threads);
DECLARE_int32(num_acceptor_threads);
DECLARE_string(hostname);

DECLARE_string(ssl_client_ca_certificate);
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
DECLARE_string(ssl_private_key_password_cmd);
DECLARE_string(ssl_cipher_list);

// The path of the current executable file that is required for passing into the SASL
// library as the 'application name'.
static string CURRENT_EXECUTABLE_PATH;

namespace impala {

static int32_t SERVICE_PORT = FindUnusedEphemeralPort();

const static string IMPALA_HOME(getenv("IMPALA_HOME"));
const string& SERVER_CERT =
    Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
const string& PRIVATE_KEY =
    Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
const string& BAD_SERVER_CERT =
    Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
const string& BAD_PRIVATE_KEY =
    Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
const string& PASSWORD_PROTECTED_PRIVATE_KEY =
    Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);

/// Use this class to set the appropriate required TLS flags for the duration of the
/// lifetime of the object.
/// It is assumed that the flags always hold empty values by default.
class ScopedSetTlsFlags {
 public:
  ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
      const string& pkey_passwd = "", const string& ciphers = "") {
    FLAGS_ssl_server_certificate = cert;
    FLAGS_ssl_private_key = pkey;
    FLAGS_ssl_client_ca_certificate = ca_cert;
    FLAGS_ssl_private_key_password_cmd = pkey_passwd;
    FLAGS_ssl_cipher_list = ciphers;
  }

  ~ScopedSetTlsFlags() {
    FLAGS_ssl_server_certificate = "";
    FLAGS_ssl_private_key = "";
    FLAGS_ssl_client_ca_certificate = "";
    FLAGS_ssl_private_key_password_cmd = "";
    FLAGS_ssl_cipher_list = "";
  }
};

// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
// support.
const string TLS1_0_COMPATIBLE_CIPHER = "AES128-SHA";
const string TLS1_0_COMPATIBLE_CIPHER_2 = "AES256-SHA";

#define PAYLOAD_SIZE (4096)

class RpcMgrTest : public testing::Test {
 public:
  // Utility function to initialize the parameter for ScanMem RPC.
  // Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
  // to 'controller'. Also sets up 'request' with the random value and index of the
  // sidecar.
  void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
    int32_t pattern = random();
    for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
    int idx;
    Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
    controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
    request->set_pattern(pattern);
    request->set_sidecar_idx(idx);
  }

  // Utility function which alternately makes requests to PingService and ScanMemService.
  Status RunMultipleServicesTest(RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address);

 protected:
  TNetworkAddress krpc_address_;
  RpcMgr rpc_mgr_;

  virtual void SetUp() {
    IpAddr ip;
    ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
    krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
    exec_env_.reset(new ExecEnv());
    ASSERT_OK(rpc_mgr_.Init());
  }

  virtual void TearDown() {
    rpc_mgr_.Shutdown();
  }

  // Takes over ownership of the newly created 'service' which needs to have a lifetime
  // as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
  GeneratedServiceIf* TakeOverService(std::unique_ptr<GeneratedServiceIf> service) {
    services_.emplace_back(move(service));
    return services_.back().get();
  }

 private:
  int32_t payload_[PAYLOAD_SIZE];

  // Own all the services used by the test.
  std::vector<std::unique_ptr<GeneratedServiceIf>> services_;

  // Required to set up the RPC metric groups used by the service pool.
  std::unique_ptr<ExecEnv> exec_env_;
};

typedef std::function<void(RpcContext*)> ServiceCB;

class PingServiceImpl : public PingServiceIf {
 public:
  // 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
  PingServiceImpl(RpcMgr* rpc_mgr,
      ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
    : PingServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
      rpc_mgr_(rpc_mgr),
      mem_tracker_(-1, "Ping Service"),
      cb_(cb) {}

  Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
      std::unique_ptr<PingServiceProxy>* proxy) {
    return rpc_mgr_->GetProxy(address, hostname, proxy);
  }

  virtual bool Authorize(const google::protobuf::Message* req,
      google::protobuf::Message* resp, RpcContext* context) override {
    if (!IsKerberosEnabled()) {
      const RemoteUser& remote_user = context->remote_user();
      if (remote_user.username() != "impala") {
        mem_tracker_.Release(context->GetTransferSize());
        context->RespondFailure(kudu::Status::NotAuthorized(
            Substitute("$0 is not allowed to access PingService",
                remote_user.ToString())));
        return false;
      }
      return true;
    } else {
      return rpc_mgr_->Authorize("PingService", context, mem_tracker());
    }
  }

  virtual void Ping(const PingRequestPB* request, PingResponsePB* response, RpcContext*
      context) override {
    response->set_int_response(42);
    // Incoming requests will already be tracked and we need to release the memory.
    mem_tracker_.Release(context->GetTransferSize());
    cb_(context);
  }

  MemTracker* mem_tracker() { return &mem_tracker_; }

 private:
  RpcMgr* rpc_mgr_;
  MemTracker mem_tracker_;
  ServiceCB cb_;
};

class ScanMemServiceImpl : public ScanMemServiceIf {
 public:
  ScanMemServiceImpl(RpcMgr* rpc_mgr)
    : ScanMemServiceIf(rpc_mgr->metric_entity(), rpc_mgr->result_tracker()),
      rpc_mgr_(rpc_mgr),
      mem_tracker_(-1, "ScanMem Service") {
  }

  Status GetProxy(const TNetworkAddress& address, const std::string& hostname,
      std::unique_ptr<ScanMemServiceProxy>* proxy) {
    return rpc_mgr_->GetProxy(address, hostname, proxy);
  }

  // A no-op authorization function.
  virtual bool Authorize(const google::protobuf::Message* req,
      google::protobuf::Message* resp, RpcContext* context) override {
    return true;
  }

  // The request comes with an int 'pattern' and a payload of int array sent with
  // sidecar. Scan the array to make sure every element matches 'pattern'.
  virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
      RpcContext* context) override {
    int32_t pattern = request->pattern();
    Slice payload;
    ASSERT_OK(
        FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
    ASSERT_EQ(payload.size() % sizeof(int32_t), 0);

    const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
    for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
      int32_t val = v[i];
      if (val != pattern) {
        // Incoming requests will already be tracked and we need to release the memory.
        mem_tracker_.Release(context->GetTransferSize());
        context->RespondFailure(kudu::Status::Corruption(
            Substitute("Expecting $1; Found $2", pattern, val)));
        return;
      }
    }
    // Incoming requests will already be tracked and we need to release the memory.
    mem_tracker_.Release(context->GetTransferSize());
    context->RespondSuccess();
  }

  MemTracker* mem_tracker() { return &mem_tracker_; }

 private:
  RpcMgr* rpc_mgr_;
  MemTracker mem_tracker_;

};

/// A class that behaves like a ::kudu::rpc::Proxy and keeps a count of the number of
/// times it is called. It always fails by returning an IOError.
class FailingPingServiceProxy {
 public:
  kudu::Status Ping(const class PingRequestPB& req, class PingResponsePB* resp,
      ::kudu::rpc::RpcController* controller) {
    ++number_of_calls_;
    return kudu::Status::IOError(
        Substitute("ping failing, number of calls=$0.", number_of_calls_));
  }

  /// Return the number of times Ping has been called.
  int GetNumberOfCalls() const { return number_of_calls_; }

 private:
  /// Number of times Ping has been called.
  int number_of_calls_ = 0;
};

Status RpcMgrTest::RunMultipleServicesTest(
    RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
  // Test that a service can be started, and will respond to requests.
  GeneratedServiceIf* ping_impl = TakeOverService(
      make_unique<PingServiceImpl>(rpc_mgr));
  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
      static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));

  // Test that a second service, that verifies the RPC payload is not corrupted,
  // can be started.
  GeneratedServiceIf* scan_mem_impl =
      TakeOverService(make_unique<ScanMemServiceImpl>(rpc_mgr));

  RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
      static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));

  FLAGS_num_acceptor_threads = 2;
  FLAGS_num_reactor_threads = 10;
  RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));

  unique_ptr<PingServiceProxy> ping_proxy;
  RETURN_IF_ERROR(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address,
      FLAGS_hostname, &ping_proxy));

  unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
  RETURN_IF_ERROR(static_cast<ScanMemServiceImpl*>(scan_mem_impl)->GetProxy(krpc_address,
      FLAGS_hostname, &scan_mem_proxy));

  RpcController controller;
  srand(0);
  // Randomly invoke either services to make sure a RpcMgr can host multiple
  // services at the same time.
  for (int i = 0; i < 100; ++i) {
    controller.Reset();
    if (random() % 2 == 0) {
      PingRequestPB request;
      PingResponsePB response;
      KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
          "unable to execute Ping() RPC.");
      if (response.int_response() != 42) {
        return Status(
            Substitute("Ping() failed. Incorrect response. Expected: 42; Got: $0",
                response.int_response()));
      }
    } else {
      ScanMemRequestPB request;
      ScanMemResponsePB response;
      SetupScanMemRequest(&request, &controller);
      KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
          "unable to execute ScanMem() RPC.");
    }
  }
  return Status::OK();
}

} // namespace impala

#endif
