blob: 773eb3998605652984516b5911ef2c607d3cef35 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/acceptor_pool.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/rpc-test-base.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_service.h"
#include "kudu/rpc/rtest.pb.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/util/barrier.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Add);
METRIC_DECLARE_counter(queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_counter(rpc_connections_accepted);
METRIC_DECLARE_counter(rpcs_queue_overflow);
using std::string;
using std::shared_ptr;
using std::thread;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace rpc {
class MultiThreadedRpcTest : public RpcTestBase {
public:
// Make a single RPC call.
void SingleCall(Sockaddr server_addr, const string& method_name,
Status* result, CountDownLatch* latch) {
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
CHECK_OK(CreateMessenger("ClientSC", &client_messenger));
Proxy p(client_messenger, server_addr, server_addr.host(),
GenericCalculatorService::static_service_name());
*result = DoTestSyncCall(&p, method_name);
latch->CountDown();
}
// Make RPC calls until we see a failure.
void HammerServer(Sockaddr server_addr, const string& method_name,
Status* last_result) {
shared_ptr<Messenger> client_messenger;
CHECK_OK(CreateMessenger("ClientHS", &client_messenger));
HammerServerWithMessenger(server_addr, method_name, last_result, client_messenger);
}
void HammerServerWithMessenger(
Sockaddr server_addr, const string& method_name, Status* last_result,
const shared_ptr<Messenger>& messenger) {
LOG(INFO) << "Connecting to " << server_addr.ToString();
Proxy p(messenger, server_addr, server_addr.host(),
GenericCalculatorService::static_service_name());
int i = 0;
while (true) {
i++;
Status s = DoTestSyncCall(&p, method_name);
if (!s.ok()) {
// Return on first failure.
LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
<< s.ToString();
*last_result = s;
return;
}
}
}
};
static void AssertShutdown(thread* thread, const Status* status) {
thread->join();
string msg = status->ToString();
ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
msg.find("Network error") != string::npos)
<< "Status is actually: " << msg;
}
// Test making several concurrent RPC calls while shutting down.
// Simply verify that we don't hit any CHECK errors.
TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) {
// Set up server.
Sockaddr server_addr;
ASSERT_OK(StartTestServer(&server_addr));
constexpr int kNumThreads = 4;
thread threads[kNumThreads];
Status statuses[kNumThreads];
for (int i = 0; i < kNumThreads; i++) {
auto* my_status = &statuses[i];
threads[i] = thread([this, server_addr, my_status]() {
this->HammerServer(server_addr, GenericCalculatorService::kAddMethodName, my_status);
});
}
SleepFor(MonoDelta::FromMilliseconds(50));
// Shut down server.
ASSERT_OK(server_messenger_->UnregisterService(service_name_));
service_pool_->Shutdown();
server_messenger_->Shutdown();
for (int i = 0; i < kNumThreads; i++) {
AssertShutdown(&threads[i], &statuses[i]);
}
}
// Test shutting down the client messenger exactly as a thread is about to start
// a new connection. This is a regression test for KUDU-104.
TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
// Set up server.
Sockaddr server_addr;
ASSERT_OK(StartTestServer(&server_addr));
shared_ptr<Messenger> client_messenger;
ASSERT_OK(CreateMessenger("Client", &client_messenger));
Status status;
thread thread([this, server_addr, &status, client_messenger]() {
this->HammerServerWithMessenger(server_addr, GenericCalculatorService::kAddMethodName,
&status, client_messenger);
});
// Shut down the messenger after a very brief sleep. This often will race so that the
// call gets submitted to the messenger before shutdown, but the negotiation won't have
// started yet. In a debug build this fails about half the time without the bug fix.
// See KUDU-104.
SleepFor(MonoDelta::FromMicroseconds(10));
client_messenger->Shutdown();
client_messenger.reset();
thread.join();
ASSERT_TRUE(status.IsAborted() ||
status.IsServiceUnavailable());
string msg = status.ToString();
SCOPED_TRACE(msg);
ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos ||
msg.find("reactor is shutting down") != string::npos ||
msg.find("Unable to start connection negotiation thread") != string::npos)
<< "Status is actually: " << msg;
}
// This bogus service pool leaves the service queue full.
class BogusServicePool : public ServicePool {
public:
BogusServicePool(unique_ptr<ServiceIf> service,
const scoped_refptr<MetricEntity>& metric_entity,
size_t service_queue_length)
: ServicePool(std::move(service), metric_entity, service_queue_length) {
}
virtual Status Init(int num_threads) OVERRIDE {
// Do nothing
return Status::OK();
}
};
void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) {
string msg = status->ToString();
if (msg.find("service queue is full") != string::npos) {
++(*backpressure);
} else if (msg.find("shutting down") != string::npos) {
++(*shutdown);
} else if (msg.find("recv got EOF from") != string::npos) {
++(*shutdown);
} else {
FAIL() << "Unexpected status message: " << msg;
}
}
// Test that we get a Service Unavailable error when we max out the incoming RPC service queue.
TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
const size_t kMaxConcurrency = 2;
MessengerBuilder bld("messenger1");
bld.set_num_reactors(kMaxConcurrency);
bld.set_metric_entity(metric_entity_);
CHECK_OK(bld.Build(&server_messenger_));
shared_ptr<AcceptorPool> pool;
ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr::Wildcard(), &pool));
ASSERT_OK(pool->Start(kMaxConcurrency));
Sockaddr server_addr = pool->bind_address();
unique_ptr<ServiceIf> service(new GenericCalculatorService());
service_name_ = service->service_name();
service_pool_ = new BogusServicePool(std::move(service),
server_messenger_->metric_entity(),
kMaxConcurrency);
ASSERT_OK(service_pool_->Init(n_worker_threads_));
server_messenger_->RegisterService(service_name_, service_pool_);
constexpr int kNumThreads = 3;
thread threads[kNumThreads];
Status status[kNumThreads];
CountDownLatch latch(1);
for (int i = 0; i < kNumThreads; i++) {
auto* my_status = &status[i];
threads[i] = thread([this, server_addr, my_status, &latch]() {
this->SingleCall(server_addr, GenericCalculatorService::kAddMethodName,
my_status, &latch);
});
}
// One should immediately fail due to backpressure. The latch is only initialized
// to wait for the first of three threads to finish.
latch.Wait();
// The rest would time out after 10 sec, but we help them along.
ASSERT_OK(server_messenger_->UnregisterService(service_name_));
service_pool_->Shutdown();
server_messenger_->Shutdown();
for (auto& t : threads) {
t.join();
}
// Verify that one error was due to backpressure.
int errors_backpressure = 0;
int errors_shutdown = 0;
for (const auto& s : status) {
IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown);
}
ASSERT_EQ(1, errors_backpressure);
ASSERT_EQ(2, errors_shutdown);
// Check that RPC queue overflow metric is 1
Counter *rpcs_queue_overflow =
METRIC_rpcs_queue_overflow.Instantiate(server_messenger_->metric_entity()).get();
ASSERT_EQ(1, rpcs_queue_overflow->value());
}
TEST_F(MultiThreadedRpcTest, PerMethodQueueOverflowRejectionCounter) {
n_acceptor_pool_threads_ = 1;
n_server_reactor_threads_ = 1;
n_worker_threads_ = 1;
service_queue_length_ = 1;
Sockaddr server_addr;
ASSERT_OK(StartTestServerWithGeneratedCode(&server_addr, false /*enable_ssl*/));
auto* rpcs_queue_overflow = METRIC_rpcs_queue_overflow.Instantiate(
server_messenger_->metric_entity()).get();
ASSERT_EQ(0, rpcs_queue_overflow->value());
auto* queue_overflow_rejections_add =
METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Add.Instantiate(
server_messenger_->metric_entity()).get();
ASSERT_EQ(0, queue_overflow_rejections_add->value());
auto* queue_overflow_rejections_sleep =
METRIC_queue_overflow_rejections_kudu_rpc_test_CalculatorService_Sleep.Instantiate(
server_messenger_->metric_entity()).get();
ASSERT_EQ(0, queue_overflow_rejections_sleep->value());
// It seems that even in case of scheduling anomalies, 16 concurrent
// requests should be enough to overflow the RPC service queue of size 1,
// where the queue is served by a single worker thread and it takes at least
// 100ms for a request to complete.
constexpr size_t kNumThreads = 16;
vector<thread> threads;
threads.reserve(kNumThreads);
vector<Status> status(kNumThreads);
Barrier barrier(kNumThreads);
for (size_t i = 0; i < kNumThreads; ++i) {
const size_t idx = i;
threads.emplace_back([&, idx]() {
shared_ptr<Messenger> client_messenger;
CHECK_OK(CreateMessenger("ClientMessenger", &client_messenger));
Proxy p(client_messenger, server_addr, server_addr.host(),
CalculatorService::static_service_name());
SleepRequestPB req;
req.set_sleep_micros(100 * 1000); // 100ms
SleepResponsePB resp;
RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(10000));
barrier.Wait();
status[idx] = p.SyncRequest(
GenericCalculatorService::kSleepMethodName, req, &resp, &controller);
});
}
for (auto& t : threads) {
t.join();
}
size_t num_errors = 0;
for (const auto& s : status) {
if (!s.ok()) {
++num_errors;
}
}
// Check that there were some RPC queue overflows.
const auto queue_overflow_num = rpcs_queue_overflow->value();
ASSERT_GT(num_errors, 0);
ASSERT_EQ(num_errors, queue_overflow_num);
// Check corresponding per-method rejection counters.
ASSERT_EQ(0, queue_overflow_rejections_add->value());
ASSERT_EQ(queue_overflow_num, queue_overflow_rejections_sleep->value());
}
static void HammerServerWithTCPConns(const Sockaddr& addr) {
while (true) {
Socket socket;
CHECK_OK(socket.Init(addr.family(), 0));
Status s;
LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
s = socket.Connect(addr);
}
if (!s.ok()) {
CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString();
return;
}
CHECK_OK(socket.Close());
}
}
// Regression test for KUDU-128.
// Test that shuts down the server while new TCP connections are incoming.
TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
// Set up server.
Sockaddr server_addr;
ASSERT_OK(StartTestServer(&server_addr));
// Start a number of threads which just hammer the server with TCP connections.
constexpr int kNumThreads = 8;
vector<thread> threads;
threads.reserve(kNumThreads);
for (int i = 0; i < kNumThreads; i++) {
threads.emplace_back([server_addr]() { HammerServerWithTCPConns(server_addr); });
}
// Sleep until the server has started to actually accept some connections from the
// test threads.
scoped_refptr<Counter> conns_accepted =
METRIC_rpc_connections_accepted.Instantiate(server_messenger_->metric_entity());
while (conns_accepted->value() == 0) {
SleepFor(MonoDelta::FromMicroseconds(100));
}
// Shutdown while there are still new connections appearing.
ASSERT_OK(server_messenger_->UnregisterService(service_name_));
service_pool_->Shutdown();
server_messenger_->Shutdown();
for (auto& t : threads) {
t.join();
}
}
} // namespace rpc
} // namespace kudu