blob: 8d5312f67fd68a9d35a8558e36bcd843af26796f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "rpc/rpc-mgr-test-base.h"
using kudu::rpc::ServiceIf;
using kudu::rpc::RpcController;
using kudu::rpc::RpcContext;
using kudu::MonoDelta;
DECLARE_int32(num_reactor_threads);
DECLARE_int32(num_acceptor_threads);
DECLARE_int32(rpc_negotiation_timeout_ms);
DECLARE_string(hostname);
namespace impala {
// For tests that do not require kerberized testing, we use RpcTest.
class RpcMgrTest : public RpcMgrTestBase<testing::Test> {
virtual void SetUp() {
RpcMgrTestBase::SetUp();
}
virtual void TearDown() {
RpcMgrTestBase::TearDown();
}
};
TEST_F(RpcMgrTest, MultipleServicesTls) {
// TODO: We're starting a seperate RpcMgr here instead of configuring
// RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce
// new gtest params to turn on TLS which needs to be a coordinated change across
// rpc-mgr-test and thrift-server-test.
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT);
ASSERT_OK(tls_rpc_mgr.Init());
ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
tls_rpc_mgr.Shutdown();
}
TEST_F(RpcMgrTest, MultipleServices) {
ASSERT_OK(RunMultipleServicesTestTemplate(this, &rpc_mgr_, krpc_address_));
}
// Test with a misconfigured TLS certificate and verify that an error is thrown.
TEST_F(RpcMgrTest, BadCertificateTls) {
ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, "unknown");
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ASSERT_FALSE(tls_rpc_mgr.Init().ok());
tls_rpc_mgr.Shutdown();
}
// Test with a bad password command for the password protected private key.
TEST_F(RpcMgrTest, BadPasswordTls) {
ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
"echo badpassword");
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ASSERT_FALSE(tls_rpc_mgr.Init().ok());
tls_rpc_mgr.Shutdown();
}
// Test with a correct password command for the password protected private key.
TEST_F(RpcMgrTest, CorrectPasswordTls) {
ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT,
"echo password");
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ASSERT_OK(tls_rpc_mgr.Init());
ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
tls_rpc_mgr.Shutdown();
}
// Test with a bad TLS cipher and verify that an error is thrown.
TEST_F(RpcMgrTest, BadCiphersTls) {
ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", "not_a_cipher");
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ASSERT_FALSE(tls_rpc_mgr.Init().ok());
tls_rpc_mgr.Shutdown();
}
// Test with a valid TLS cipher.
TEST_F(RpcMgrTest, ValidCiphersTls) {
ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "",
TLS1_0_COMPATIBLE_CIPHER);
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ASSERT_OK(tls_rpc_mgr.Init());
ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
tls_rpc_mgr.Shutdown();
}
// Test with multiple valid TLS ciphers.
TEST_F(RpcMgrTest, ValidMultiCiphersTls) {
const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER,
TLS1_0_COMPATIBLE_CIPHER_2);
ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", cipher_list);
RpcMgr tls_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress tls_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t tls_service_port = FindUnusedEphemeralPort(nullptr);
tls_krpc_address = MakeNetworkAddress(ip, tls_service_port);
ASSERT_OK(tls_rpc_mgr.Init());
ASSERT_OK(RunMultipleServicesTestTemplate(this, &tls_rpc_mgr, tls_krpc_address));
tls_rpc_mgr.Shutdown();
}
TEST_F(RpcMgrTest, SlowCallback) {
// Use a callback which is slow to respond.
auto slow_cb = [](RpcContext* ctx) {
SleepForMs(300);
ctx->RespondSuccess();
};
// Test a service which is slow to respond and has a short queue.
// Set a timeout on the client side. Expect either a client timeout
// or the service queue filling up.
ServiceIf* ping_impl = TakeOverService(make_unique<PingServiceImpl>(
rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker(), slow_cb));
const int num_service_threads = 1;
const int queue_size = 3;
ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl,
static_cast<PingServiceImpl*>(ping_impl)->mem_tracker()));
FLAGS_num_acceptor_threads = 2;
FLAGS_num_reactor_threads = 10;
ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
unique_ptr<PingServiceProxy> proxy;
ASSERT_OK(rpc_mgr_.GetProxy<PingServiceProxy>(krpc_address_, &proxy));
PingRequestPB request;
PingResponsePB response;
RpcController controller;
for (int i = 0; i < 100; ++i) {
controller.Reset();
controller.set_timeout(MonoDelta::FromMilliseconds(50));
kudu::Status status = proxy->Ping(request, &response, &controller);
ASSERT_TRUE(status.IsTimedOut() || RpcMgr::IsServerTooBusy(controller));
}
}
TEST_F(RpcMgrTest, AsyncCall) {
ServiceIf* scan_mem_impl = TakeOverService(make_unique<ScanMemServiceImpl>(
rpc_mgr_.metric_entity(), rpc_mgr_.result_tracker()));
ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl,
static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker()));
unique_ptr<ScanMemServiceProxy> scan_mem_proxy;
ASSERT_OK(rpc_mgr_.GetProxy<ScanMemServiceProxy>(krpc_address_, &scan_mem_proxy));
FLAGS_num_acceptor_threads = 2;
FLAGS_num_reactor_threads = 10;
ASSERT_OK(rpc_mgr_.StartServices(krpc_address_));
RpcController controller;
srand(0);
for (int i = 0; i < 10; ++i) {
controller.Reset();
ScanMemRequestPB request;
ScanMemResponsePB response;
SetupScanMemRequest(&request, &controller);
CountingBarrier barrier(1);
scan_mem_proxy->ScanMemAsync(request, &response, &controller,
[barrier_ptr = &barrier]() { barrier_ptr->Notify(); });
// TODO: Inject random cancellation here.
barrier.Wait();
ASSERT_TRUE(controller.status().ok()) << controller.status().ToString();
}
}
// Run a test with the negotiation timeout as 0 ms and ensure that connection
// establishment fails.
// This is to verify that FLAGS_rpc_negotiation_timeout_ms is actually effective.
TEST_F(RpcMgrTest, NegotiationTimeout) {
// Set negotiation timeout to 0 milliseconds.
auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_rpc_negotiation_timeout_ms, 0);
RpcMgr secondary_rpc_mgr(IsInternalTlsConfigured());
TNetworkAddress secondary_krpc_address;
IpAddr ip;
ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip));
int32_t secondary_service_port = FindUnusedEphemeralPort(nullptr);
secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port);
ASSERT_OK(secondary_rpc_mgr.Init());
ASSERT_FALSE(RunMultipleServicesTestTemplate(
this, &secondary_rpc_mgr, secondary_krpc_address).ok());
secondary_rpc_mgr.Shutdown();
}
} // namespace impala
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
// Fill in the path of the current binary for use by the tests.
CURRENT_EXECUTABLE_PATH = argv[0];
return RUN_ALL_TESTS();
}