blob: 4a79040388687e4a80e99481dd0d6fc10a0285c2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "rpc/rpc-mgr.inline.h"
#include "common/init.h"
#include "exec/kudu-util.h"
#include "kudu/rpc/rpc_context.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_sidecar.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "rpc/auth-provider.h"
#include "runtime/mem-tracker.h"
#include "testutil/gtest-util.h"
#include "testutil/mini-kdc-wrapper.h"
#include "testutil/scoped-flag-setter.h"
#include "util/counting-barrier.h"
#include "util/network-util.h"
#include "util/openssl-util.h"
#include "util/test-info.h"
#include "gen-cpp/rpc_test.proxy.h"
#include "gen-cpp/rpc_test.service.h"
#include "common/names.h"
using kudu::rpc::ServiceIf;
using kudu::rpc::RpcController;
using kudu::rpc::RpcContext;
using kudu::rpc::RpcSidecar;
using kudu::Slice;
using namespace std;
DECLARE_int32(num_reactor_threads);
DECLARE_int32(num_acceptor_threads);
DECLARE_string(hostname);
DECLARE_string(ssl_client_ca_certificate);
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
DECLARE_string(ssl_private_key_password_cmd);
DECLARE_string(ssl_cipher_list);
// The path of the current executable file that is required for passing into the SASL
// library as the 'application name'.
static string CURRENT_EXECUTABLE_PATH;
namespace impala {
static int32_t SERVICE_PORT = FindUnusedEphemeralPort(nullptr);
int GetServerPort() {
int port = FindUnusedEphemeralPort(nullptr);
EXPECT_FALSE(port == -1);
return port;
}
const static string IMPALA_HOME(getenv("IMPALA_HOME"));
const string& SERVER_CERT =
Substitute("$0/be/src/testutil/server-cert.pem", IMPALA_HOME);
const string& PRIVATE_KEY =
Substitute("$0/be/src/testutil/server-key.pem", IMPALA_HOME);
const string& BAD_SERVER_CERT =
Substitute("$0/be/src/testutil/bad-cert.pem", IMPALA_HOME);
const string& BAD_PRIVATE_KEY =
Substitute("$0/be/src/testutil/bad-key.pem", IMPALA_HOME);
const string& PASSWORD_PROTECTED_PRIVATE_KEY =
Substitute("$0/be/src/testutil/server-key-password.pem", IMPALA_HOME);
/// Use this class to set the appropriate required TLS flags for the duration of the
/// lifetime of the object.
/// It is assumed that the flags always hold empty values by default.
class ScopedSetTlsFlags {
public:
ScopedSetTlsFlags(const string& cert, const string& pkey, const string& ca_cert,
const string& pkey_passwd = "", const string& ciphers = "") {
FLAGS_ssl_server_certificate = cert;
FLAGS_ssl_private_key = pkey;
FLAGS_ssl_client_ca_certificate = ca_cert;
FLAGS_ssl_private_key_password_cmd = pkey_passwd;
FLAGS_ssl_cipher_list = ciphers;
}
~ScopedSetTlsFlags() {
FLAGS_ssl_server_certificate = "";
FLAGS_ssl_private_key = "";
FLAGS_ssl_client_ca_certificate = "";
FLAGS_ssl_private_key_password_cmd = "";
FLAGS_ssl_cipher_list = "";
}
};
// Only use TLSv1.0 compatible ciphers, as tests might run on machines with only TLSv1.0
// support.
const string TLS1_0_COMPATIBLE_CIPHER = "RC4-SHA";
const string TLS1_0_COMPATIBLE_CIPHER_2 = "RC4-MD5";
#define PAYLOAD_SIZE (4096)
template <class T> class RpcMgrTestBase : public T {
public:
// Utility function to initialize the parameter for ScanMem RPC.
// Picks a random value and fills 'payload_' with it. Adds 'payload_' as a sidecar
// to 'controller'. Also sets up 'request' with the random value and index of the
// sidecar.
void SetupScanMemRequest(ScanMemRequestPB* request, RpcController* controller) {
int32_t pattern = random();
for (int i = 0; i < PAYLOAD_SIZE / sizeof(int32_t); ++i) payload_[i] = pattern;
int idx;
Slice slice(reinterpret_cast<const uint8_t*>(payload_), PAYLOAD_SIZE);
controller->AddOutboundSidecar(RpcSidecar::FromSlice(slice), &idx);
request->set_pattern(pattern);
request->set_sidecar_idx(idx);
}
// Takes over ownership of the newly created 'service' which needs to have a lifetime
// as long as 'rpc_mgr_' as RpcMgr::Shutdown() will call Shutdown() of 'service'.
ServiceIf* TakeOverService(std::unique_ptr<ServiceIf> service) {
services_.emplace_back(move(service));
return services_.back().get();
}
protected:
TNetworkAddress krpc_address_;
RpcMgr rpc_mgr_;
virtual void SetUp() {
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
krpc_address_ = MakeNetworkAddress(ip, SERVICE_PORT);
ASSERT_OK(rpc_mgr_.Init());
}
virtual void TearDown() {
rpc_mgr_.Shutdown();
}
private:
int32_t payload_[PAYLOAD_SIZE];
// Own all the services used by the test.
std::vector<std::unique_ptr<ServiceIf>> services_;
};
typedef std::function<void(RpcContext*)> ServiceCB;
class PingServiceImpl : public PingServiceIf {
public:
// 'cb' is a callback used by tests to inject custom behaviour into the RPC handler.
PingServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
const scoped_refptr<kudu::rpc::ResultTracker> tracker,
ServiceCB cb = [](RpcContext* ctx) { ctx->RespondSuccess(); })
: PingServiceIf(entity, tracker), mem_tracker_(-1, "Ping Service"), cb_(cb) {}
virtual void Ping(
const PingRequestPB* request, PingResponsePB* response, RpcContext* context) {
response->set_int_response(42);
// Incoming requests will already be tracked and we need to release the memory.
mem_tracker_.Release(context->GetTransferSize());
cb_(context);
}
MemTracker* mem_tracker() { return &mem_tracker_; }
private:
MemTracker mem_tracker_;
ServiceCB cb_;
};
class ScanMemServiceImpl : public ScanMemServiceIf {
public:
ScanMemServiceImpl(const scoped_refptr<kudu::MetricEntity>& entity,
const scoped_refptr<kudu::rpc::ResultTracker> tracker)
: ScanMemServiceIf(entity, tracker), mem_tracker_(-1, "ScanMem Service") {
}
// The request comes with an int 'pattern' and a payload of int array sent with
// sidecar. Scan the array to make sure every element matches 'pattern'.
virtual void ScanMem(const ScanMemRequestPB* request, ScanMemResponsePB* response,
RpcContext* context) {
int32_t pattern = request->pattern();
Slice payload;
ASSERT_OK(
FromKuduStatus(context->GetInboundSidecar(request->sidecar_idx(), &payload)));
ASSERT_EQ(payload.size() % sizeof(int32_t), 0);
const int32_t* v = reinterpret_cast<const int32_t*>(payload.data());
for (int i = 0; i < payload.size() / sizeof(int32_t); ++i) {
int32_t val = v[i];
if (val != pattern) {
// Incoming requests will already be tracked and we need to release the memory.
mem_tracker_.Release(context->GetTransferSize());
context->RespondFailure(kudu::Status::Corruption(
Substitute("Expecting $1; Found $2", pattern, val)));
return;
}
}
// Incoming requests will already be tracked and we need to release the memory.
mem_tracker_.Release(context->GetTransferSize());
context->RespondSuccess();
}
MemTracker* mem_tracker() { return &mem_tracker_; }
private:
MemTracker mem_tracker_;
};
template <class T>
Status RunMultipleServicesTestTemplate(RpcMgrTestBase<T>* test_base,
RpcMgr* rpc_mgr, const TNetworkAddress& krpc_address) {
// Test that a service can be started, and will respond to requests.
ServiceIf* ping_impl = test_base->TakeOverService(make_unique<PingServiceImpl>(
rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, ping_impl,
static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
// Test that a second service, that verifies the RPC payload is not corrupted,
// can be started.
ServiceIf* scan_mem_impl = test_base->TakeOverService(make_unique<ScanMemServiceImpl>(
rpc_mgr->metric_entity(), rpc_mgr->result_tracker()));
RETURN_IF_ERROR(rpc_mgr->RegisterService(10, 10, scan_mem_impl,
static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
FLAGS_num_acceptor_threads = 2;
FLAGS_num_reactor_threads = 10;
RETURN_IF_ERROR(rpc_mgr->StartServices(krpc_address));
unique_ptr<PingServiceProxy> ping_proxy;
RETURN_IF_ERROR(rpc_mgr->GetProxy<PingServiceProxy>(krpc_address, &ping_proxy));
unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
RETURN_IF_ERROR(rpc_mgr->GetProxy<ScanMemServiceProxy>(krpc_address, &scan_mem_proxy));
RpcController controller;
srand(0);
// Randomly invoke either services to make sure a RpcMgr can host multiple
// services at the same time.
for (int i = 0; i < 100; ++i) {
controller.Reset();
if (random() % 2 == 0) {
PingRequestPB request;
PingResponsePB response;
KUDU_RETURN_IF_ERROR(ping_proxy->Ping(request, &response, &controller),
"unable to execute Ping() RPC.");
if (response.int_response() != 42) {
return Status(Substitute(
"Ping() failed. Incorrect response. Expected: 42; Got: $0",
response.int_response()));
}
} else {
ScanMemRequestPB request;
ScanMemResponsePB response;
test_base->SetupScanMemRequest(&request, &controller);
KUDU_RETURN_IF_ERROR(scan_mem_proxy->ScanMem(request, &response, &controller),
"unable to execute ScanMem() RPC.");
}
}
return Status::OK();
}
} // namespace impala