blob: abf21ee24c602f09a28c40596faa8f515fcf07cd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <atomic>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <limits>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <glog/stl_logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/rpc-test-base.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/rpc/rpc_introspection.pb.h"
#include "kudu/rpc/rpcz_store.h"
#include "kudu/rpc/rtest.pb.h"
#include "kudu/rpc/rtest.proxy.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/rpc/user_credentials.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
#include "kudu/util/subprocess.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
#include "kudu/util/thread_restrictions.h"
#include "kudu/util/user.h"
DEFINE_bool(is_panic_test_child, false, "Used by TestRpcPanic");
DECLARE_bool(socket_inject_short_recvs);
using kudu::pb_util::SecureDebugString;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using base::subtle::NoBarrier_Load;
namespace kudu {
namespace rpc {
class RpcStubTest : public RpcTestBase {
public:
void SetUp() override {
RpcTestBase::SetUp();
// Use a shorter queue length since some tests below need to start enough
// threads to saturate the queue.
service_queue_length_ = 10;
ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr_));
ASSERT_OK(CreateMessenger("Client", &client_messenger_));
}
protected:
void SendSimpleCall() {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController controller;
AddRequestPB req;
req.set_x(10);
req.set_y(20);
AddResponsePB resp;
ASSERT_OK(p.Add(req, &resp, &controller));
ASSERT_EQ(30, resp.result());
}
Sockaddr server_addr_;
shared_ptr<Messenger> client_messenger_;
};
TEST_F(RpcStubTest, TestSimpleCall) {
SendSimpleCall();
}
// Regression test for a bug in which we would not properly parse a call
// response when recv() returned a 'short read'. This injects such short
// reads and then makes a number of calls.
TEST_F(RpcStubTest, TestShortRecvs) {
FLAGS_socket_inject_short_recvs = true;
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
for (int i = 0; i < 100; i++) {
NO_FATALS(SendSimpleCall());
}
}
// Test calls which are rather large.
// This test sends many of them at once using the async API and then
// waits for them all to return. This is meant to ensure that the
// IO threads can deal with read/write calls that don't succeed
// in sending the entire data in one go.
TEST_F(RpcStubTest, TestBigCallData) {
const int kNumSentAtOnce = 20;
const size_t kMessageSize = 5 * 1024 * 1024;
string data;
data.resize(kMessageSize);
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
EchoRequestPB req;
req.set_data(data);
vector<unique_ptr<EchoResponsePB>> resps;
vector<unique_ptr<RpcController>> controllers;
CountDownLatch latch(kNumSentAtOnce);
for (int i = 0; i < kNumSentAtOnce; i++) {
resps.emplace_back(new EchoResponsePB);
controllers.emplace_back(new RpcController);
p.EchoAsync(req, resps.back().get(), controllers.back().get(),
[&latch]() { latch.CountDown(); });
}
latch.Wait();
for (const auto& c : controllers) {
ASSERT_OK(c->status());
}
}
TEST_F(RpcStubTest, TestRespondDeferred) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController controller;
SleepRequestPB req;
req.set_sleep_micros(1000);
req.set_deferred(true);
SleepResponsePB resp;
ASSERT_OK(p.Sleep(req, &resp, &controller));
}
// Test that the default user credentials are propagated to the server.
TEST_F(RpcStubTest, TestDefaultCredentialsPropagated) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
string expected;
ASSERT_OK(GetLoggedInUser(&expected));
RpcController controller;
WhoAmIRequestPB req;
WhoAmIResponsePB resp;
ASSERT_OK(p.WhoAmI(req, &resp, &controller));
ASSERT_EQ(expected, resp.credentials().real_user());
ASSERT_FALSE(resp.credentials().has_effective_user());
}
// Test that the user can specify other credentials.
TEST_F(RpcStubTest, TestCustomCredentialsPropagated) {
const char* const kFakeUserName = "some fake user";
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
UserCredentials creds;
creds.set_real_user(kFakeUserName);
p.set_user_credentials(creds);
RpcController controller;
WhoAmIRequestPB req;
WhoAmIResponsePB resp;
ASSERT_OK(p.WhoAmI(req, &resp, &controller));
ASSERT_EQ(kFakeUserName, resp.credentials().real_user());
ASSERT_FALSE(resp.credentials().has_effective_user());
}
TEST_F(RpcStubTest, TestAuthorization) {
// First test calling WhoAmI() as user "alice", who is disallowed.
{
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
UserCredentials creds;
creds.set_real_user("alice");
p.set_user_credentials(creds);
// Alice is disallowed by all RPCs.
{
RpcController controller;
WhoAmIRequestPB req;
WhoAmIResponsePB resp;
Status s = p.WhoAmI(req, &resp, &controller);
ASSERT_FALSE(s.ok());
ASSERT_EQ(s.ToString(),
"Remote error: Not authorized: alice is not allowed to call this method");
}
// KUDU-2540: Authorization failures on exactly-once RPCs cause FATAL
{
RpcController controller;
unique_ptr<RequestIdPB> request_id(new RequestIdPB);
request_id->set_client_id("client-id");
request_id->set_attempt_no(0);
request_id->set_seq_no(0);
request_id->set_first_incomplete_seq_no(-1);
controller.SetRequestIdPB(std::move(request_id));
ExactlyOnceRequestPB req;
req.set_value_to_add(1);
ExactlyOnceResponsePB resp;
Status s = p.AddExactlyOnce(req, &resp, &controller);
ASSERT_FALSE(s.ok());
ASSERT_EQ(s.ToString(),
"Remote error: Not authorized: alice is not allowed to call this method");
}
}
// Try some calls as "bob".
{
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
UserCredentials creds;
creds.set_real_user("bob");
p.set_user_credentials(creds);
// "bob" is allowed to call WhoAmI().
{
RpcController controller;
WhoAmIRequestPB req;
WhoAmIResponsePB resp;
ASSERT_OK(p.WhoAmI(req, &resp, &controller));
}
// "bob" is not allowed to call "Sleep".
{
RpcController controller;
SleepRequestPB req;
req.set_sleep_micros(10);
SleepResponsePB resp;
Status s = p.Sleep(req, &resp, &controller);
ASSERT_EQ(s.ToString(),
"Remote error: Not authorized: bob is not allowed to call this method");
}
}
}
// Test that the user's remote address is accessible to the server.
TEST_F(RpcStubTest, TestRemoteAddress) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController controller;
WhoAmIRequestPB req;
WhoAmIResponsePB resp;
ASSERT_OK(p.WhoAmI(req, &resp, &controller));
ASSERT_STR_CONTAINS(resp.address(), "127.0.0.1:");
}
////////////////////////////////////////////////////////////
// Tests for error cases
////////////////////////////////////////////////////////////
// Test sending a PB parameter with a missing field, where the client
// thinks it has sent a full PB. (eg due to version mismatch)
TEST_F(RpcStubTest, TestCallWithInvalidParam) {
Proxy p(client_messenger_, server_addr_, server_addr_.host(),
CalculatorService::static_service_name());
rpc_test::AddRequestPartialPB req;
req.set_x(rand());
// AddRequestPartialPB is missing the 'y' field.
AddResponsePB resp;
RpcController controller;
Status s = p.SyncRequest("Add", req, &resp, &controller);
ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"Invalid argument: invalid parameter for call "
"kudu.rpc_test.CalculatorService.Add: "
"missing fields: y");
}
// Wrapper around AtomicIncrement, since AtomicIncrement returns the 'old'
// value, and our callback needs to be a void function.
static void DoIncrement(Atomic32* count) {
base::subtle::Barrier_AtomicIncrement(count, 1);
}
// Test sending a PB parameter with a missing field on the client side.
// This also ensures that the async callback is only called once
// (regression test for a previously-encountered bug).
TEST_F(RpcStubTest, TestCallWithMissingPBFieldClientSide) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController controller;
AddRequestPB req;
req.set_x(10);
// Request is missing the 'y' field.
AddResponsePB resp;
Atomic32 callback_count = 0;
p.AddAsync(req, &resp, &controller, [&callback_count]() { DoIncrement(&callback_count); });
while (NoBarrier_Load(&callback_count) == 0) {
SleepFor(MonoDelta::FromMicroseconds(10));
}
SleepFor(MonoDelta::FromMicroseconds(100));
ASSERT_EQ(1, NoBarrier_Load(&callback_count));
ASSERT_STR_CONTAINS(controller.status().ToString(),
"Invalid argument: invalid parameter for call "
"kudu.rpc_test.CalculatorService.Add: missing fields: y");
}
TEST_F(RpcStubTest, TestResponseWithMissingField) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController rpc;
TestInvalidResponseRequestPB req;
TestInvalidResponseResponsePB resp;
req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_MISSING_REQUIRED_FIELD);
Status s = p.TestInvalidResponse(req, &resp, &rpc);
ASSERT_STR_CONTAINS(s.ToString(),
"invalid RPC response, missing fields: response");
}
// Test case where the server responds with a message which is larger than the maximum
// configured RPC message size. The server should send the response, but the client
// will reject it.
TEST_F(RpcStubTest, TestResponseLargerThanFrameSize) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController rpc;
TestInvalidResponseRequestPB req;
TestInvalidResponseResponsePB resp;
req.set_error_type(rpc_test::TestInvalidResponseRequestPB_ErrorType_RESPONSE_TOO_LARGE);
Status s = p.TestInvalidResponse(req, &resp, &rpc);
ASSERT_STR_CONTAINS(s.ToString(), "Network error: RPC frame had a length of");
}
// Test sending a call which isn't implemented by the server.
TEST_F(RpcStubTest, TestCallMissingMethod) {
Proxy p(client_messenger_, server_addr_, server_addr_.host(),
CalculatorService::static_service_name());
Status s = DoTestSyncCall(&p, "DoesNotExist");
ASSERT_TRUE(s.IsRemoteError()) << "Bad status: " << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "with an invalid method name: DoesNotExist");
}
TEST_F(RpcStubTest, TestApplicationError) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController controller;
SleepRequestPB req;
SleepResponsePB resp;
req.set_sleep_micros(1);
req.set_return_app_error(true);
Status s = p.Sleep(req, &resp, &controller);
ASSERT_TRUE(s.IsRemoteError()) << s.ToString();
EXPECT_EQ("Remote error: Got some error", s.ToString());
EXPECT_EQ("message: \"Got some error\"\n"
"[kudu.rpc_test.CalculatorError.app_error_ext] {\n"
" extra_error_data: \"some application-specific error data\"\n"
"}\n",
SecureDebugString(*controller.error_response()));
}
TEST_F(RpcStubTest, TestRpcPanic) {
if (!FLAGS_is_panic_test_child) {
// This is a poor man's death test. We call this same
// test case, but set the above flag, and verify that
// it aborted. gtest death tests don't work here because
// there are already threads started up.
vector<string> argv;
string executable_path;
CHECK_OK(env_->GetExecutablePath(&executable_path));
argv.push_back(executable_path);
argv.emplace_back("--is_panic_test_child");
argv.emplace_back("--gtest_filter=RpcStubTest.TestRpcPanic");
Subprocess subp(argv);
subp.ShareParentStderr(false);
CHECK_OK(subp.Start());
FILE* in = fdopen(subp.from_child_stderr_fd(), "r");
PCHECK(in);
// Search for string "Test method panicking!" somewhere in stderr
char buf[1024];
bool found_string = false;
while (fgets(buf, sizeof(buf), in)) {
if (strstr(buf, "Test method panicking!")) {
found_string = true;
break;
}
}
CHECK(found_string);
// Check return status
int wait_status = 0;
CHECK_OK(subp.Wait(&wait_status));
CHECK(!WIFEXITED(wait_status)); // should not have been successful
if (WIFSIGNALED(wait_status)) {
CHECK_EQ(WTERMSIG(wait_status), SIGABRT);
} else {
// On some systems, we get exit status 134 from SIGABRT rather than
// WIFSIGNALED getting flagged.
CHECK_EQ(WEXITSTATUS(wait_status), 134);
}
return;
} else {
// Before forcing the panic, explicitly remove the test directory. This
// should be safe; this test doesn't generate any data.
CHECK_OK(env_->DeleteRecursively(test_dir_));
// Make an RPC which causes the server to abort.
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
RpcController controller;
PanicRequestPB req;
PanicResponsePB resp;
p.Panic(req, &resp, &controller);
}
}
struct AsyncSleep {
AsyncSleep() : latch(1) {}
RpcController rpc;
SleepRequestPB req;
SleepResponsePB resp;
CountDownLatch latch;
};
TEST_F(RpcStubTest, TestDontHandleTimedOutCalls) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
vector<unique_ptr<AsyncSleep>> sleeps;
sleeps.reserve(n_worker_threads_);
// Send enough sleep calls to occupy the worker threads.
for (int i = 0; i < n_worker_threads_; i++) {
unique_ptr<AsyncSleep> sleep(new AsyncSleep);
sleep->rpc.set_timeout(MonoDelta::FromSeconds(1));
sleep->req.set_sleep_micros(1000*1000); // 1sec
auto& l = sleep->latch;
p.SleepAsync(sleep->req, &sleep->resp, &sleep->rpc,
[&l]() { l.CountDown(); });
sleeps.emplace_back(std::move(sleep));
}
// We asynchronously sent the RPCs above, but the RPCs might still
// be in the queue. Because the RPC we send next has a lower timeout,
// it would take priority over the long-timeout RPCs. So, we have to
// wait until the above RPCs are being processed before we continue
// the test.
const Histogram* queue_time_metric = service_pool_->IncomingQueueTimeMetricForTests();
while (queue_time_metric->TotalCount() < n_worker_threads_) {
SleepFor(MonoDelta::FromMilliseconds(1));
}
// Send another call with a short timeout. This shouldn't get processed, because
// it'll get stuck in the queue for longer than its timeout.
ASSERT_EVENTUALLY([&]() {
RpcController rpc;
SleepRequestPB req;
SleepResponsePB resp;
req.set_sleep_micros(1); // unused but required.
rpc.set_timeout(MonoDelta::FromMilliseconds(5));
Status s = p.Sleep(req, &resp, &rpc);
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
// Since our timeout was short, it's possible in rare circumstances
// that we time out the RPC on the outbound queue, in which case
// we won't trigger the desired behavior here. In that case, the
// timeout error status would have the string 'ON_OUTBOUND_QUEUE'
// instead of 'SENT', so this assertion would fail and cause the
// ASSERT_EVENTUALLY to loop.
ASSERT_STR_CONTAINS(s.ToString(), "SENT");
});
for (const auto& s : sleeps) {
s->latch.Wait();
}
// Verify that the timedout call got short circuited before being processed.
// We may need to loop a short amount of time as we are racing with the reactor
// thread to process the remaining elements of the queue.
const Counter* timed_out_in_queue = service_pool_->RpcsTimedOutInQueueMetricForTests();
ASSERT_EVENTUALLY([&]{
ASSERT_EQ(1, timed_out_in_queue->value());
});
}
// Test which ensures that the RPC queue accepts requests with the earliest
// deadline first (EDF), and upon overflow rejects requests with the latest deadlines.
//
// In particular, this simulates a workload experienced with Impala where the local
// impalad would spawn more scanner threads than the total number of handlers plus queue
// slots, guaranteeing that some of those clients would see SERVER_TOO_BUSY rejections on
// scan requests and be forced to back off and retry. Without EDF scheduling, we saw that
// the "unlucky" threads that got rejected would likely continue to get rejected upon
// retries, and some would be starved continually until they missed their overall deadline
// and failed the query.
//
// With EDF scheduling, the retries take priority over the original requests (because
// they retain their original deadlines). This prevents starvation of unlucky threads.
TEST_F(RpcStubTest, TestEarliestDeadlineFirstQueue) {
const int num_client_threads = service_queue_length_ + n_worker_threads_ + 5;
vector<std::thread> threads;
vector<int> successes(num_client_threads);
std::atomic<bool> done(false);
for (int thread_id = 0; thread_id < num_client_threads; thread_id++) {
threads.emplace_back([&, thread_id] {
Random rng(thread_id);
CalculatorServiceProxy p(
client_messenger_, server_addr_, server_addr_.host());
while (!done.load()) {
// Set a deadline in the future. We'll keep using this same deadline
// on each of our retries.
MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(8);
for (int attempt = 1; !done.load(); attempt++) {
RpcController controller;
SleepRequestPB req;
SleepResponsePB resp;
controller.set_deadline(deadline);
req.set_sleep_micros(100000);
Status s = p.Sleep(req, &resp, &controller);
if (s.ok()) {
successes[thread_id]++;
break;
}
// We expect to get SERVER_TOO_BUSY errors because we have more clients than the
// server has handlers and queue slots. No other errors are expected.
CHECK(s.IsRemoteError() &&
controller.error_response()->code() == rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY)
<< "Unexpected RPC failure: " << s.ToString();
// Randomized exponential backoff (similar to that done by the scanners in the Kudu
// client.).
int backoff = (0.5 + rng.NextDoubleFraction() * 0.5) * (std::min(1 << attempt, 1000));
VLOG(1) << "backoff " << backoff << "ms";
SleepFor(MonoDelta::FromMilliseconds(backoff));
}
}
});
}
// Let the threads run for 5 seconds before stopping them.
SleepFor(MonoDelta::FromSeconds(5));
done.store(true);
for (auto& t : threads) {
t.join();
}
// Before switching to earliest-deadline-first scheduling, the results
// here would typically look something like:
// 1 1 0 1 10 17 6 1 12 12 17 10 8 7 12 9 16 15
// With the fix, we see something like:
// 9 9 9 8 9 9 9 9 9 9 9 9 9 9 9 9 9
LOG(INFO) << "thread RPC success counts: " << successes;
int sum = 0;
int min = std::numeric_limits<int>::max();
for (int x : successes) {
sum += x;
min = std::min(min, x);
}
int avg = sum / successes.size();
ASSERT_GT(min, avg / 2)
<< "expected the least lucky thread to have at least half as many successes "
<< "as the average thread: min=" << min << " avg=" << avg;
}
TEST_F(RpcStubTest, TestDumpCallsInFlight) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
AsyncSleep sleep;
sleep.req.set_sleep_micros(100 * 1000); // 100ms
auto& l = sleep.latch;
p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
[&l]() { l.CountDown(); });
// Check the running RPC status on the client messenger.
DumpConnectionsRequestPB dump_req;
DumpConnectionsResponsePB dump_resp;
dump_req.set_include_traces(true);
ASSERT_OK(client_messenger_->DumpConnections(dump_req, &dump_resp));
LOG(INFO) << "client messenger: " << SecureDebugString(dump_resp);
ASSERT_EQ(1, dump_resp.outbound_connections_size());
ASSERT_EQ(1, dump_resp.outbound_connections(0).calls_in_flight_size());
ASSERT_EQ("Sleep", dump_resp.outbound_connections(0).calls_in_flight(0).
header().remote_method().method_name());
ASSERT_GT(dump_resp.outbound_connections(0).calls_in_flight(0).micros_elapsed(), 0);
// And the server messenger.
// We have to loop this until we find a result since the actual call is sent
// asynchronously off of the main thread (ie the server may not be handling it yet)
for (int i = 0; i < 100; i++) {
dump_resp.Clear();
ASSERT_OK(server_messenger_->DumpConnections(dump_req, &dump_resp));
if (dump_resp.inbound_connections_size() > 0 &&
dump_resp.inbound_connections(0).calls_in_flight_size() > 0) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
LOG(INFO) << "server messenger: " << SecureDebugString(dump_resp);
ASSERT_EQ(1, dump_resp.inbound_connections_size());
ASSERT_EQ(1, dump_resp.inbound_connections(0).calls_in_flight_size());
ASSERT_EQ("Sleep", dump_resp.inbound_connections(0).calls_in_flight(0).
header().remote_method().method_name());
ASSERT_GT(dump_resp.inbound_connections(0).calls_in_flight(0).micros_elapsed(), 0);
ASSERT_STR_CONTAINS(dump_resp.inbound_connections(0).calls_in_flight(0).trace_buffer(),
"Inserting onto call queue");
sleep.latch.Wait();
}
TEST_F(RpcStubTest, TestDumpSampledCalls) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
// Issue two calls that fall into different latency buckets.
AsyncSleep sleeps[2];
sleeps[0].req.set_sleep_micros(150 * 1000); // 150ms
sleeps[1].req.set_sleep_micros(1500 * 1000); // 1500ms
for (auto& sleep : sleeps) {
auto& l = sleep.latch;
p.SleepAsync(sleep.req, &sleep.resp, &sleep.rpc,
[&l]() { l.CountDown(); });
}
for (auto& sleep : sleeps) {
sleep.latch.Wait();
}
// Dump the sampled RPCs and expect to see the calls
// above.
DumpRpczStoreResponsePB sampled_rpcs;
server_messenger_->rpcz_store()->DumpPB(DumpRpczStoreRequestPB(), &sampled_rpcs);
EXPECT_EQ(sampled_rpcs.methods_size(), 1);
ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs),
" metrics {\n"
" key: \"test_sleep_us\"\n"
" value: 150000\n"
" }\n");
ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs),
" metrics {\n"
" key: \"test_sleep_us\"\n"
" value: 1500000\n"
" }\n");
ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs),
" metrics {\n"
" child_path: \"test_child\"\n"
" key: \"related_trace_metric\"\n"
" value: 1\n"
" }");
ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "SleepRequestPB");
ASSERT_STR_CONTAINS(SecureDebugString(sampled_rpcs), "duration_ms");
}
namespace {
struct RefCountedTest : public RefCountedThreadSafe<RefCountedTest> {
};
// Test callback which takes a refcounted pointer.
// We don't use this parameter, but it's used to validate that the bound callback
// is cleared in TestCallbackClearedAfterRunning.
void MyTestCallback(CountDownLatch* latch, scoped_refptr<RefCountedTest> my_refptr) {
latch->CountDown();
}
} // anonymous namespace
// Verify that, after a call has returned, no copy of the call's callback
// is held. This is important when the callback holds a refcounted ptr,
// since we expect to be able to release that pointer when the call is done.
TEST_F(RpcStubTest, TestCallbackClearedAfterRunning) {
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
CountDownLatch latch(1);
scoped_refptr<RefCountedTest> my_refptr(new RefCountedTest);
RpcController controller;
AddRequestPB req;
req.set_x(10);
req.set_y(20);
AddResponsePB resp;
p.AddAsync(req, &resp, &controller,
[&latch, my_refptr]() { MyTestCallback(&latch, my_refptr); });
latch.Wait();
// The ref count should go back down to 1. However, we need to loop a little
// bit, since the deref is happening on another thread. If the other thread gets
// descheduled directly after calling our callback, we'd fail without these sleeps.
for (int i = 0; i < 100 && !my_refptr->HasOneRef(); i++) {
SleepFor(MonoDelta::FromMilliseconds(1));
}
ASSERT_TRUE(my_refptr->HasOneRef());
}
// Regression test for KUDU-1409: if the client reactor thread is blocked (e.g due to a
// process-wide pause or a slow callback) then we should not cause RPC calls to time out.
TEST_F(RpcStubTest, DontTimeOutWhenReactorIsBlocked) {
CHECK_EQ(client_messenger_->num_reactors(), 1)
<< "This test requires only a single reactor. Otherwise the injected sleep might "
<< "be scheduled on a different reactor than the RPC call.";
CalculatorServiceProxy p(client_messenger_, server_addr_, server_addr_.host());
// Schedule a 1-second sleep on the reactor thread.
//
// This will cause us the reactor to be blocked while the call response is received, and
// still be blocked when the timeout would normally occur. Despite this, the call should
// not time out.
//
// 0s 0.5s 1.2s 1.5s
// RPC call running
// |---------------------|
// Reactor blocked in sleep
// |----------------------|
// \_ RPC would normally time out
client_messenger_->ScheduleOnReactor([](const Status& s) {
ThreadRestrictions::ScopedAllowWait allow_wait;
SleepFor(MonoDelta::FromSeconds(1));
}, MonoDelta::FromSeconds(0.5));
RpcController controller;
SleepRequestPB req;
SleepResponsePB resp;
req.set_sleep_micros(800 * 1000);
controller.set_timeout(MonoDelta::FromMilliseconds(1200));
ASSERT_OK(p.Sleep(req, &resp, &controller));
}
} // namespace rpc
} // namespace kudu