blob: 7427850c84d020fbefa082c4cb27153152a57789 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/rpc-test-base.h"
#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/acceptor_pool.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/proxy.h"
#include "kudu/rpc/rpc_service.h"
#include "kudu/rpc/service_if.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/thread.h"
METRIC_DECLARE_counter(rpc_connections_accepted);
METRIC_DECLARE_counter(rpcs_queue_overflow);
using std::string;
using std::shared_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace rpc {
class MultiThreadedRpcTest : public RpcTestBase {
public:
// Make a single RPC call.
void SingleCall(Sockaddr server_addr, const char* method_name,
Status* result, CountDownLatch* latch) {
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger;
CHECK_OK(CreateMessenger("ClientSC", &client_messenger));
Proxy p(client_messenger, server_addr, server_addr.host(),
GenericCalculatorService::static_service_name());
*result = DoTestSyncCall(p, method_name);
latch->CountDown();
}
// Make RPC calls until we see a failure.
void HammerServer(Sockaddr server_addr, const char* method_name,
Status* last_result) {
shared_ptr<Messenger> client_messenger;
CHECK_OK(CreateMessenger("ClientHS", &client_messenger));
HammerServerWithMessenger(server_addr, method_name, last_result, client_messenger);
}
void HammerServerWithMessenger(
Sockaddr server_addr, const char* method_name, Status* last_result,
const shared_ptr<Messenger>& messenger) {
LOG(INFO) << "Connecting to " << server_addr.ToString();
Proxy p(messenger, server_addr, server_addr.host(),
GenericCalculatorService::static_service_name());
int i = 0;
while (true) {
i++;
Status s = DoTestSyncCall(p, method_name);
if (!s.ok()) {
// Return on first failure.
LOG(INFO) << "Call failed. Shutting down client thread. Ran " << i << " calls: "
<< s.ToString();
*last_result = s;
return;
}
}
}
};
static void AssertShutdown(kudu::Thread* thread, const Status* status) {
ASSERT_OK(ThreadJoiner(thread).warn_every_ms(500).Join());
string msg = status->ToString();
ASSERT_TRUE(msg.find("Service unavailable") != string::npos ||
msg.find("Network error") != string::npos)
<< "Status is actually: " << msg;
}
// Test making several concurrent RPC calls while shutting down.
// Simply verify that we don't hit any CHECK errors.
TEST_F(MultiThreadedRpcTest, TestShutdownDuringService) {
// Set up server.
Sockaddr server_addr;
ASSERT_OK(StartTestServer(&server_addr));
const int kNumThreads = 4;
scoped_refptr<kudu::Thread> threads[kNumThreads];
Status statuses[kNumThreads];
for (int i = 0; i < kNumThreads; i++) {
ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
&MultiThreadedRpcTest::HammerServer, this, server_addr,
GenericCalculatorService::kAddMethodName, &statuses[i], &threads[i]));
}
SleepFor(MonoDelta::FromMilliseconds(50));
// Shut down server.
ASSERT_OK(server_messenger_->UnregisterService(service_name_));
service_pool_->Shutdown();
server_messenger_->Shutdown();
for (int i = 0; i < kNumThreads; i++) {
AssertShutdown(threads[i].get(), &statuses[i]);
}
}
// Test shutting down the client messenger exactly as a thread is about to start
// a new connection. This is a regression test for KUDU-104.
TEST_F(MultiThreadedRpcTest, TestShutdownClientWhileCallsPending) {
// Set up server.
Sockaddr server_addr;
ASSERT_OK(StartTestServer(&server_addr));
shared_ptr<Messenger> client_messenger;
ASSERT_OK(CreateMessenger("Client", &client_messenger));
scoped_refptr<kudu::Thread> thread;
Status status;
ASSERT_OK(kudu::Thread::Create("test", "test",
&MultiThreadedRpcTest::HammerServerWithMessenger, this, server_addr,
GenericCalculatorService::kAddMethodName, &status, client_messenger, &thread));
// Shut down the messenger after a very brief sleep. This often will race so that the
// call gets submitted to the messenger before shutdown, but the negotiation won't have
// started yet. In a debug build this fails about half the time without the bug fix.
// See KUDU-104.
SleepFor(MonoDelta::FromMicroseconds(10));
client_messenger->Shutdown();
client_messenger.reset();
ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
ASSERT_TRUE(status.IsAborted() ||
status.IsServiceUnavailable());
string msg = status.ToString();
SCOPED_TRACE(msg);
ASSERT_TRUE(msg.find("Client RPC Messenger shutting down") != string::npos ||
msg.find("reactor is shutting down") != string::npos ||
msg.find("Unable to start connection negotiation thread") != string::npos)
<< "Status is actually: " << msg;
}
// This bogus service pool leaves the service queue full.
class BogusServicePool : public ServicePool {
public:
BogusServicePool(gscoped_ptr<ServiceIf> service,
const scoped_refptr<MetricEntity>& metric_entity,
size_t service_queue_length)
: ServicePool(std::move(service), metric_entity, service_queue_length) {
}
virtual Status Init(int num_threads) OVERRIDE {
// Do nothing
return Status::OK();
}
};
void IncrementBackpressureOrShutdown(const Status* status, int* backpressure, int* shutdown) {
string msg = status->ToString();
if (msg.find("service queue is full") != string::npos) {
++(*backpressure);
} else if (msg.find("shutting down") != string::npos) {
++(*shutdown);
} else if (msg.find("got EOF from remote") != string::npos) {
++(*shutdown);
} else {
FAIL() << "Unexpected status message: " << msg;
}
}
// Test that we get a Service Unavailable error when we max out the incoming RPC service queue.
TEST_F(MultiThreadedRpcTest, TestBlowOutServiceQueue) {
const size_t kMaxConcurrency = 2;
MessengerBuilder bld("messenger1");
bld.set_num_reactors(kMaxConcurrency);
bld.set_metric_entity(metric_entity_);
CHECK_OK(bld.Build(&server_messenger_));
shared_ptr<AcceptorPool> pool;
ASSERT_OK(server_messenger_->AddAcceptorPool(Sockaddr(), &pool));
ASSERT_OK(pool->Start(kMaxConcurrency));
Sockaddr server_addr = pool->bind_address();
gscoped_ptr<ServiceIf> service(new GenericCalculatorService());
service_name_ = service->service_name();
service_pool_ = new BogusServicePool(std::move(service),
server_messenger_->metric_entity(),
kMaxConcurrency);
ASSERT_OK(service_pool_->Init(n_worker_threads_));
server_messenger_->RegisterService(service_name_, service_pool_);
scoped_refptr<kudu::Thread> threads[3];
Status status[3];
CountDownLatch latch(1);
for (int i = 0; i < 3; i++) {
ASSERT_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
&MultiThreadedRpcTest::SingleCall, this, server_addr,
GenericCalculatorService::kAddMethodName, &status[i], &latch, &threads[i]));
}
// One should immediately fail due to backpressure. The latch is only initialized
// to wait for the first of three threads to finish.
latch.Wait();
// The rest would time out after 10 sec, but we help them along.
ASSERT_OK(server_messenger_->UnregisterService(service_name_));
service_pool_->Shutdown();
server_messenger_->Shutdown();
for (const auto& thread : threads) {
ASSERT_OK(ThreadJoiner(thread.get()).warn_every_ms(500).Join());
}
// Verify that one error was due to backpressure.
int errors_backpressure = 0;
int errors_shutdown = 0;
for (const auto& s : status) {
IncrementBackpressureOrShutdown(&s, &errors_backpressure, &errors_shutdown);
}
ASSERT_EQ(1, errors_backpressure);
ASSERT_EQ(2, errors_shutdown);
// Check that RPC queue overflow metric is 1
Counter *rpcs_queue_overflow =
METRIC_rpcs_queue_overflow.Instantiate(server_messenger_->metric_entity()).get();
ASSERT_EQ(1, rpcs_queue_overflow->value());
}
static void HammerServerWithTCPConns(const Sockaddr& addr) {
while (true) {
Socket socket;
CHECK_OK(socket.Init(0));
Status s;
LOG_SLOW_EXECUTION(INFO, 100, "Connect took long") {
s = socket.Connect(addr);
}
if (!s.ok()) {
CHECK(s.IsNetworkError()) << "Unexpected error: " << s.ToString();
return;
}
CHECK_OK(socket.Close());
}
}
// Regression test for KUDU-128.
// Test that shuts down the server while new TCP connections are incoming.
TEST_F(MultiThreadedRpcTest, TestShutdownWithIncomingConnections) {
// Set up server.
Sockaddr server_addr;
ASSERT_OK(StartTestServer(&server_addr));
// Start a number of threads which just hammer the server with TCP connections.
vector<scoped_refptr<kudu::Thread> > threads;
for (int i = 0; i < 8; i++) {
scoped_refptr<kudu::Thread> new_thread;
CHECK_OK(kudu::Thread::Create("test", strings::Substitute("t$0", i),
&HammerServerWithTCPConns, server_addr, &new_thread));
threads.push_back(new_thread);
}
// Sleep until the server has started to actually accept some connections from the
// test threads.
scoped_refptr<Counter> conns_accepted =
METRIC_rpc_connections_accepted.Instantiate(server_messenger_->metric_entity());
while (conns_accepted->value() == 0) {
SleepFor(MonoDelta::FromMicroseconds(100));
}
// Shutdown while there are still new connections appearing.
ASSERT_OK(server_messenger_->UnregisterService(service_name_));
service_pool_->Shutdown();
server_messenger_->Shutdown();
for (scoped_refptr<kudu::Thread>& t : threads) {
ASSERT_OK(ThreadJoiner(t.get()).warn_every_ms(500).Join());
}
}
} // namespace rpc
} // namespace kudu