| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "rpc/rpc-mgr-test.h" |
| |
| #include "kudu/util/monotime.h" |
| #include "service/fe-support.h" |
| #include "testutil/mini-kdc-wrapper.h" |
| #include "util/counting-barrier.h" |
| |
| using kudu::rpc::GeneratedServiceIf; |
| using kudu::rpc::RpcController; |
| using kudu::rpc::RpcContext; |
| using kudu::MonoDelta; |
| |
| DECLARE_int32(num_reactor_threads); |
| DECLARE_int32(num_acceptor_threads); |
| DECLARE_int32(rpc_negotiation_timeout_ms); |
| DECLARE_string(hostname); |
| DECLARE_string(debug_actions); |
| |
| // For tests that do not require kerberized testing, we use RpcTest. |
| namespace impala { |
| |
| // Test multiple services managed by an Rpc Manager using TLS. |
| TEST_F(RpcMgrTest, MultipleServicesTls) { |
| // TODO: We're starting a separate RpcMgr here instead of configuring |
| // RpcTestBase::rpc_mgr_ to use TLS. To use RpcTestBase::rpc_mgr_, we need to introduce |
| // new gtest params to turn on TLS which needs to be a coordinated change across |
| // rpc-mgr-test and thrift-server-test. |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT); |
| ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); |
| |
| ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test multiple services managed by an Rpc Manager. |
| TEST_F(RpcMgrTest, MultipleServices) { |
| ASSERT_OK(RunMultipleServicesTest(&rpc_mgr_, krpc_address_)); |
| } |
| |
| // Test with a misconfigured TLS certificate and verify that an error is thrown. |
| TEST_F(RpcMgrTest, BadCertificateTls) { |
| ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, "unknown"); |
| |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok()); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test with a bad password command for the password protected private key. |
| TEST_F(RpcMgrTest, BadPasswordTls) { |
| ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT, |
| "echo badpassword"); |
| |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok()); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test with a correct password command for the password protected private key. |
| TEST_F(RpcMgrTest, CorrectPasswordTls) { |
| ScopedSetTlsFlags s(SERVER_CERT, PASSWORD_PROTECTED_PRIVATE_KEY, SERVER_CERT, |
| "echo password"); |
| |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); |
| ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test with a bad TLS cipher and verify that an error is thrown. |
| TEST_F(RpcMgrTest, BadCiphersTls) { |
| ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", "not_a_cipher"); |
| |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ASSERT_FALSE(tls_rpc_mgr.Init(tls_krpc_address).ok()); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test with a valid TLS cipher. |
| TEST_F(RpcMgrTest, ValidCiphersTls) { |
| ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", |
| TLS1_0_COMPATIBLE_CIPHER); |
| |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); |
| ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test with multiple valid TLS ciphers. |
| TEST_F(RpcMgrTest, ValidMultiCiphersTls) { |
| const string cipher_list = Substitute("$0,$1", TLS1_0_COMPATIBLE_CIPHER, |
| TLS1_0_COMPATIBLE_CIPHER_2); |
| ScopedSetTlsFlags s(SERVER_CERT, PRIVATE_KEY, SERVER_CERT, "", cipher_list); |
| |
| RpcMgr tls_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress tls_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t tls_service_port = FindUnusedEphemeralPort(); |
| tls_krpc_address = MakeNetworkAddress(ip, tls_service_port); |
| |
| ASSERT_OK(tls_rpc_mgr.Init(tls_krpc_address)); |
| ASSERT_OK(RunMultipleServicesTest(&tls_rpc_mgr, tls_krpc_address)); |
| tls_rpc_mgr.Shutdown(); |
| } |
| |
| // Test behavior with a slow service. |
| TEST_F(RpcMgrTest, SlowCallback) { |
| // Use a callback which is slow to respond. |
| auto slow_cb = [](RpcContext* ctx) { |
| SleepForMs(300); |
| ctx->RespondSuccess(); |
| }; |
| |
| // Test a service which is slow to respond and has a short queue. |
| // Set a timeout on the client side. Expect either a client timeout |
| // or the service queue filling up. |
| GeneratedServiceIf* ping_impl = |
| TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_, slow_cb)); |
| const int num_service_threads = 1; |
| const int queue_size = 3; |
| ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl, |
| static_cast<PingServiceImpl*>(ping_impl)->mem_tracker())); |
| |
| FLAGS_num_acceptor_threads = 2; |
| FLAGS_num_reactor_threads = 10; |
| ASSERT_OK(rpc_mgr_.StartServices()); |
| |
| unique_ptr<PingServiceProxy> proxy; |
| ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy(krpc_address_, |
| FLAGS_hostname, &proxy)); |
| |
| PingRequestPB request; |
| PingResponsePB response; |
| RpcController controller; |
| for (int i = 0; i < 100; ++i) { |
| controller.Reset(); |
| controller.set_timeout(MonoDelta::FromMilliseconds(50)); |
| kudu::Status status = proxy->Ping(request, &response, &controller); |
| ASSERT_TRUE(status.IsTimedOut() || RpcMgr::IsServerTooBusy(controller)); |
| } |
| } |
| |
| // Test async calls. |
| TEST_F(RpcMgrTest, AsyncCall) { |
| GeneratedServiceIf* scan_mem_impl = |
| TakeOverService(make_unique<ScanMemServiceImpl>(&rpc_mgr_)); |
| ASSERT_OK(rpc_mgr_.RegisterService(10, 10, scan_mem_impl, |
| static_cast<ScanMemServiceImpl*>(scan_mem_impl)->mem_tracker())); |
| |
| unique_ptr<ScanMemServiceProxy> scan_mem_proxy; |
| ASSERT_OK(static_cast<ScanMemServiceImpl*>(scan_mem_impl)->GetProxy(krpc_address_, |
| FLAGS_hostname, &scan_mem_proxy)); |
| |
| FLAGS_num_acceptor_threads = 2; |
| FLAGS_num_reactor_threads = 10; |
| ASSERT_OK(rpc_mgr_.StartServices()); |
| |
| RpcController controller; |
| srand(0); |
| for (int i = 0; i < 10; ++i) { |
| controller.Reset(); |
| ScanMemRequestPB request; |
| ScanMemResponsePB response; |
| SetupScanMemRequest(&request, &controller); |
| CountingBarrier barrier(1); |
| scan_mem_proxy->ScanMemAsync( |
| request, &response, &controller, [barrier_ptr = &barrier]() { |
| discard_result(barrier_ptr->Notify()); |
| }); |
| // TODO: Inject random cancellation here. |
| barrier.Wait(); |
| ASSERT_TRUE(controller.status().ok()) << controller.status().ToString(); |
| } |
| } |
| |
| // Run a test with the negotiation timeout as 0 ms and ensure that connection |
| // establishment fails. |
| // This is to verify that FLAGS_rpc_negotiation_timeout_ms is actually effective. |
| TEST_F(RpcMgrTest, NegotiationTimeout) { |
| // Set negotiation timeout to 0 milliseconds. |
| auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_rpc_negotiation_timeout_ms, 0); |
| |
| RpcMgr secondary_rpc_mgr(IsInternalTlsConfigured()); |
| TNetworkAddress secondary_krpc_address; |
| IpAddr ip; |
| ASSERT_OK(HostnameToIpAddr(FLAGS_hostname, &ip)); |
| |
| int32_t secondary_service_port = FindUnusedEphemeralPort(); |
| secondary_krpc_address = MakeNetworkAddress(ip, secondary_service_port); |
| |
| ASSERT_OK(secondary_rpc_mgr.Init(secondary_krpc_address)); |
| ASSERT_FALSE(RunMultipleServicesTest(&secondary_rpc_mgr, secondary_krpc_address).ok()); |
| secondary_rpc_mgr.Shutdown(); |
| } |
| |
| // Test RpcMgr::DoRpcWithRetry using a fake proxy. |
| TEST_F(RpcMgrTest, DoRpcWithRetry) { |
| TQueryCtx query_ctx; |
| const int num_retries = 10; |
| const int64_t timeout_ms = 10 * MILLIS_PER_SEC; |
| |
| // Test how DoRpcWithRetry retries by using a proxy that always fails. |
| unique_ptr<FailingPingServiceProxy> failing_proxy = |
| make_unique<FailingPingServiceProxy>(); |
| // A call that fails is not retried as the server is not busy. |
| PingRequestPB request1; |
| PingResponsePB response1; |
| Status rpc_status_fail = |
| RpcMgr::DoRpcWithRetry(failing_proxy, &FailingPingServiceProxy::Ping, request1, |
| &response1, query_ctx, "ping failed", num_retries, timeout_ms); |
| ASSERT_FALSE(rpc_status_fail.ok()); |
| // Check that proxy was only called once. |
| ASSERT_EQ(1, failing_proxy->GetNumberOfCalls()); |
| |
| // Test injection of DebugAction into DoRpcWithRetry. |
| query_ctx.client_request.query_options.__set_debug_action("DoRpcWithRetry:FAIL"); |
| PingRequestPB request2; |
| PingResponsePB response2; |
| Status inject_status = RpcMgr::DoRpcWithRetry(failing_proxy, |
| &FailingPingServiceProxy::Ping, request2, &response2, query_ctx, "ping failed", |
| num_retries, timeout_ms, 0, "DoRpcWithRetry"); |
| ASSERT_FALSE(inject_status.ok()); |
| EXPECT_ERROR(inject_status, TErrorCode::INTERNAL_ERROR); |
| ASSERT_EQ("Debug Action: DoRpcWithRetry:FAIL", inject_status.msg().msg()); |
| } |
| |
| // Test RpcMgr::DoRpcWithRetry by injecting service-too-busy failures. |
| TEST_F(RpcMgrTest, BusyService) { |
| TQueryCtx query_ctx; |
| auto cb = [](RpcContext* ctx) { ctx->RespondSuccess(); }; |
| GeneratedServiceIf* ping_impl = |
| TakeOverService(make_unique<PingServiceImpl>(&rpc_mgr_, cb)); |
| const int num_service_threads = 4; |
| const int queue_size = 25; |
| ASSERT_OK(rpc_mgr_.RegisterService(num_service_threads, queue_size, ping_impl, |
| static_cast<PingServiceImpl*>(ping_impl)->mem_tracker())); |
| FLAGS_num_acceptor_threads = 2; |
| FLAGS_num_reactor_threads = 10; |
| ASSERT_OK(rpc_mgr_.StartServices()); |
| |
| // Find the counter which tracks the number of times the service queue is too full. |
| const string& overflow_count = Substitute( |
| ImpalaServicePool::RPC_QUEUE_OVERFLOW_METRIC_KEY, ping_impl->service_name()); |
| IntCounter* overflow_metric = |
| ExecEnv::GetInstance()->rpc_metrics()->FindMetricForTesting<IntCounter>( |
| overflow_count); |
| ASSERT_TRUE(overflow_metric != nullptr); |
| |
| unique_ptr<PingServiceProxy> proxy; |
| ASSERT_OK(static_cast<PingServiceImpl*>(ping_impl)->GetProxy( |
| krpc_address_, FLAGS_hostname, &proxy)); |
| |
| // There have been no overflows yet. |
| EXPECT_EQ(overflow_metric->GetValue(), 0L); |
| |
| // Use DebugAction to make the Impala Service Pool reject 50% of Krpc calls as if the |
| // service is too busy. |
| auto s = ScopedFlagSetter<string>::Make(&FLAGS_debug_actions, |
| Substitute("IMPALA_SERVICE_POOL:$0:$1:Ping:FAIL@0.5@REJECT_TOO_BUSY", |
| krpc_address_.hostname, krpc_address_.port)); |
| PingRequestPB request; |
| PingResponsePB response; |
| const int64_t timeout_ms = 10 * MILLIS_PER_SEC; |
| int num_retries = 40; // How many times DoRpcWithRetry can retry. |
| int num_rpc_retry_calls = 40; // How many times to call DoRpcWithRetry |
| for (int i = 0; i < num_rpc_retry_calls; ++i) { |
| Status status = RpcMgr::DoRpcWithRetry(proxy, &PingServiceProxy::Ping, request, |
| &response, query_ctx, "ping failed", num_retries, timeout_ms); |
| // DoRpcWithRetry will fail with probability (1/2)^num_rpc_retry_calls. |
| ASSERT_TRUE(status.ok()); |
| } |
| // There will be no overflows (i.e. service too busy) with probability |
| // (1/2)^num_retries. |
| ASSERT_GT(overflow_metric->GetValue(), 0); |
| } |
| |
| } // namespace impala |
| |
| int main(int argc, char** argv) { |
| ::testing::InitGoogleTest(&argc, argv); |
| impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST); |
| impala::InitFeSupport(); |
| |
| // Fill in the path of the current binary for use by the tests. |
| CURRENT_EXECUTABLE_PATH = argv[0]; |
| return RUN_ALL_TESTS(); |
| } |