blob: e1d2c5b941cc51b10cbf7b4c5ea177b43c0963d6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/rpc/rpc-test-base.h"
#include <memory>
#include <string>
#include <unordered_map>
#include <boost/ptr_container/ptr_vector.hpp>
#include <gtest/gtest.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/rpc/constants.h"
#include "kudu/rpc/serialization.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/env.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/test_util.h"
METRIC_DECLARE_histogram(handler_latency_kudu_rpc_test_CalculatorService_Sleep);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
using std::string;
using std::shared_ptr;
using std::unordered_map;
namespace kudu {
namespace rpc {
class TestRpc : public RpcTestBase {
};
TEST_F(TestRpc, TestSockaddr) {
Sockaddr addr1, addr2;
addr1.set_port(1000);
addr2.set_port(2000);
// port is ignored when comparing Sockaddr objects
ASSERT_FALSE(addr1 < addr2);
ASSERT_FALSE(addr2 < addr1);
ASSERT_EQ(1000, addr1.port());
ASSERT_EQ(2000, addr2.port());
ASSERT_EQ(string("0.0.0.0:1000"), addr1.ToString());
ASSERT_EQ(string("0.0.0.0:2000"), addr2.ToString());
Sockaddr addr3(addr1);
ASSERT_EQ(string("0.0.0.0:1000"), addr3.ToString());
}
TEST_F(TestRpc, TestMessengerCreateDestroy) {
shared_ptr<Messenger> messenger(CreateMessenger("TestCreateDestroy"));
LOG(INFO) << "started messenger " << messenger->name();
messenger->Shutdown();
}
// Test starting and stopping a messenger. This is a regression
// test for a segfault seen in early versions of the RPC code,
// in which shutting down the acceptor would trigger an assert,
// making our tests flaky.
TEST_F(TestRpc, TestAcceptorPoolStartStop) {
int n_iters = AllowSlowTests() ? 100 : 5;
for (int i = 0; i < n_iters; i++) {
shared_ptr<Messenger> messenger(CreateMessenger("TestAcceptorPoolStartStop"));
shared_ptr<AcceptorPool> pool;
ASSERT_OK(messenger->AddAcceptorPool(Sockaddr(), &pool));
Sockaddr bound_addr;
ASSERT_OK(pool->GetBoundAddress(&bound_addr));
ASSERT_NE(0, bound_addr.port());
ASSERT_OK(pool->Start(2));
messenger->Shutdown();
}
}
TEST_F(TestRpc, TestConnHeaderValidation) {
MessengerBuilder mb("TestRpc.TestConnHeaderValidation");
const int conn_hdr_len = kMagicNumberLength + kHeaderFlagsLength;
uint8_t buf[conn_hdr_len];
serialization::SerializeConnHeader(buf);
ASSERT_OK(serialization::ValidateConnHeader(Slice(buf, conn_hdr_len)));
}
// Test making successful RPC calls.
TEST_F(TestRpc, TestCall) {
// Set up server.
Sockaddr server_addr;
StartTestServer(&server_addr);
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
for (int i = 0; i < 10; i++) {
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
}
// Test that connecting to an invalid server properly throws an error.
TEST_F(TestRpc, TestCallToBadServer) {
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Sockaddr addr;
addr.set_port(0);
Proxy p(client_messenger, addr, GenericCalculatorService::static_service_name());
// Loop a few calls to make sure that we properly set up and tear down
// the connections.
for (int i = 0; i < 5; i++) {
Status s = DoTestSyncCall(p, GenericCalculatorService::kAddMethodName);
LOG(INFO) << "Status: " << s.ToString();
ASSERT_TRUE(s.IsNetworkError()) << "unexpected status: " << s.ToString();
}
}
// Test that RPC calls can be failed with an error status on the server.
TEST_F(TestRpc, TestInvalidMethodCall) {
// Set up server.
Sockaddr server_addr;
StartTestServer(&server_addr);
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
// Call the method which fails.
Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "bad method");
}
// Test that the error message returned when connecting to the wrong service
// is reasonable.
TEST_F(TestRpc, TestWrongService) {
// Set up server.
Sockaddr server_addr;
StartTestServer(&server_addr);
// Set up client with the wrong service name.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, "WrongServiceName");
// Call the method which fails.
Status s = DoTestSyncCall(p, "ThisMethodDoesNotExist");
ASSERT_TRUE(s.IsRemoteError()) << "unexpected status: " << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(),
"Service unavailable: service WrongServiceName "
"not registered on TestServer");
}
namespace {
int GetOpenFileLimit() {
struct rlimit limit;
PCHECK(getrlimit(RLIMIT_NOFILE, &limit) == 0);
return limit.rlim_cur;
}
} // anonymous namespace
// Test that we can still make RPC connections even if many fds are in use.
// This is a regression test for KUDU-650.
TEST_F(TestRpc, TestHighFDs) {
// This test can only run if ulimit is set high.
const int kNumFakeFiles = 3500;
const int kMinUlimit = kNumFakeFiles + 100;
if (GetOpenFileLimit() < kMinUlimit) {
LOG(INFO) << "Test skipped: must increase ulimit -n to at least " << kMinUlimit;
return;
}
// Open a bunch of fds just to increase our fd count.
vector<RandomAccessFile*> fake_files;
ElementDeleter d(&fake_files);
for (int i = 0; i < kNumFakeFiles; i++) {
gscoped_ptr<RandomAccessFile> f;
CHECK_OK(Env::Default()->NewRandomAccessFile("/dev/zero", &f));
fake_files.push_back(f.release());
}
// Set up server and client, and verify we can make a successful call.
Sockaddr server_addr;
StartTestServer(&server_addr);
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
}
// Test that connections are kept alive between calls.
TEST_F(TestRpc, TestConnectionKeepalive) {
// Only run one reactor per messenger, so we can grab the metrics from that
// one without having to check all.
n_server_reactor_threads_ = 1;
keepalive_time_ms_ = 50;
// Set up server.
Sockaddr server_addr;
StartTestServer(&server_addr);
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
ASSERT_OK(DoTestSyncCall(p, GenericCalculatorService::kAddMethodName));
SleepFor(MonoDelta::FromMilliseconds(5));
ReactorMetrics metrics;
ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
ASSERT_EQ(1, metrics.num_server_connections_) << "Server should have 1 server connection";
ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
ASSERT_EQ(1, metrics.num_client_connections_) << "Client should have 1 client connections";
SleepFor(MonoDelta::FromMilliseconds(100));
// After sleeping, the keepalive timer should have closed both sides of
// the connection.
ASSERT_OK(server_messenger_->reactors_[0]->GetMetrics(&metrics));
ASSERT_EQ(0, metrics.num_server_connections_) << "Server should have 0 server connections";
ASSERT_EQ(0, metrics.num_client_connections_) << "Server should have 0 client connections";
ASSERT_OK(client_messenger->reactors_[0]->GetMetrics(&metrics));
ASSERT_EQ(0, metrics.num_server_connections_) << "Client should have 0 server connections";
ASSERT_EQ(0, metrics.num_client_connections_) << "Client should have 0 client connections";
}
// Test that a call which takes longer than the keepalive time
// succeeds -- i.e that we don't consider a connection to be "idle" on the
// server if there is a call outstanding on it.
TEST_F(TestRpc, TestCallLongerThanKeepalive) {
// set very short keepalive
keepalive_time_ms_ = 50;
// Set up server.
Sockaddr server_addr;
StartTestServer(&server_addr);
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
// Make a call which sleeps longer than the keepalive.
RpcController controller;
SleepRequestPB req;
req.set_sleep_micros(100 * 1000);
req.set_deferred(true);
SleepResponsePB resp;
ASSERT_OK(p.SyncRequest(GenericCalculatorService::kSleepMethodName,
req, &resp, &controller));
}
// Test that the RpcSidecar transfers the expected messages.
TEST_F(TestRpc, TestRpcSidecar) {
// Set up server.
Sockaddr server_addr;
StartTestServer(&server_addr);
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
// Test some small sidecars
DoTestSidecar(p, 123, 456);
// Test some larger sidecars to verify that we properly handle the case where
// we can't write the whole response to the socket in a single call.
DoTestSidecar(p, 3000 * 1024, 2000 * 1024);
}
// Test that timeouts are properly handled.
TEST_F(TestRpc, TestCallTimeout) {
Sockaddr server_addr;
StartTestServer(&server_addr);
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
// Test a very short timeout - we expect this will time out while the
// call is still trying to connect, or in the send queue. This was triggering ASAN failures
// before.
ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromNanoseconds(1)));
// Test a longer timeout - expect this will time out after we send the request.
ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(10)));
}
static void AcceptAndReadForever(Socket* listen_sock) {
// Accept the TCP connection.
Socket server_sock;
Sockaddr remote;
CHECK_OK(listen_sock->Accept(&server_sock, &remote, 0));
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(MonoDelta::FromSeconds(10));
size_t nread;
uint8_t buf[1024];
while (server_sock.BlockingRecv(buf, sizeof(buf), &nread, deadline).ok()) {
}
}
// Starts a fake listening socket which never actually negotiates.
// Ensures that the client gets a reasonable status code in this case.
TEST_F(TestRpc, TestNegotiationTimeout) {
// Set up a simple socket server which accepts a connection.
Sockaddr server_addr;
Socket listen_sock;
ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
// Create another thread to accept the connection on the fake server.
scoped_refptr<Thread> acceptor_thread;
ASSERT_OK(Thread::Create("test", "acceptor",
AcceptAndReadForever, &listen_sock,
&acceptor_thread));
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
ASSERT_NO_FATAL_FAILURE(DoTestExpectTimeout(p, MonoDelta::FromMilliseconds(100)));
acceptor_thread->Join();
}
// Test that client calls get failed properly when the server they're connected to
// shuts down.
TEST_F(TestRpc, TestServerShutsDown) {
// Set up a simple socket server which accepts a connection.
Sockaddr server_addr;
Socket listen_sock;
ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));
// Set up client.
LOG(INFO) << "Connecting to " << server_addr.ToString();
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, GenericCalculatorService::static_service_name());
// Send a call.
AddRequestPB req;
req.set_x(rand());
req.set_y(rand());
AddResponsePB resp;
boost::ptr_vector<RpcController> controllers;
// We'll send several calls async, and ensure that they all
// get the error status when the connection drops.
int n_calls = 5;
CountDownLatch latch(n_calls);
for (int i = 0; i < n_calls; i++) {
auto controller = new RpcController();
controllers.push_back(controller);
p.AsyncRequest(GenericCalculatorService::kAddMethodName, req, &resp, controller,
boost::bind(&CountDownLatch::CountDown, boost::ref(latch)));
}
// Accept the TCP connection.
Socket server_sock;
Sockaddr remote;
ASSERT_OK(listen_sock.Accept(&server_sock, &remote, 0));
// The call is still in progress at this point.
for (const RpcController &controller : controllers) {
ASSERT_FALSE(controller.finished());
}
// Shut down the socket.
ASSERT_OK(listen_sock.Close());
ASSERT_OK(server_sock.Close());
// Wait for the call to be marked finished.
latch.Wait();
// Should get the appropriate error on the client for all calls;
for (const RpcController &controller : controllers) {
ASSERT_TRUE(controller.finished());
Status s = controller.status();
ASSERT_TRUE(s.IsNetworkError()) <<
"Unexpected status: " << s.ToString();
// Any of these errors could happen, depending on whether we were
// in the middle of sending a call while the connection died, or
// if we were already waiting for responses.
//
// ECONNREFUSED is possible because the sending of the calls is async.
// For example, the following interleaving:
// - Enqueue 3 calls
// - Reactor wakes up, creates connection, starts writing calls
// - Enqueue 2 more calls
// - Shut down socket
// - Reactor wakes up, tries to write more of the first 3 calls, gets error
// - Reactor shuts down connection
// - Reactor sees the 2 remaining calls, makes a new connection
// - Because the socket is shut down, gets ECONNREFUSED.
//
// EINVAL is possible if the controller socket had already disconnected by
// the time it trys to set the SO_SNDTIMEO socket option as part of the
// normal blocking SASL handshake.
ASSERT_TRUE(s.posix_code() == EPIPE ||
s.posix_code() == ECONNRESET ||
s.posix_code() == ESHUTDOWN ||
s.posix_code() == ECONNREFUSED ||
s.posix_code() == EINVAL)
<< "Unexpected status: " << s.ToString();
}
}
// Test handler latency metric.
TEST_F(TestRpc, TestRpcHandlerLatencyMetric) {
const uint64_t sleep_micros = 20 * 1000;
// Set up server.
Sockaddr server_addr;
StartTestServerWithGeneratedCode(&server_addr);
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
RpcController controller;
SleepRequestPB req;
req.set_sleep_micros(sleep_micros);
req.set_deferred(true);
SleepResponsePB resp;
ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller));
const unordered_map<const MetricPrototype*, scoped_refptr<Metric> > metric_map =
server_messenger_->metric_entity()->UnsafeMetricsMapForTests();
scoped_refptr<Histogram> latency_histogram = down_cast<Histogram *>(
FindOrDie(metric_map,
&METRIC_handler_latency_kudu_rpc_test_CalculatorService_Sleep).get());
LOG(INFO) << "Sleep() min lat: " << latency_histogram->MinValueForTests();
LOG(INFO) << "Sleep() mean lat: " << latency_histogram->MeanValueForTests();
LOG(INFO) << "Sleep() max lat: " << latency_histogram->MaxValueForTests();
LOG(INFO) << "Sleep() #calls: " << latency_histogram->TotalCount();
ASSERT_EQ(1, latency_histogram->TotalCount());
ASSERT_GE(latency_histogram->MaxValueForTests(), sleep_micros);
ASSERT_TRUE(latency_histogram->MinValueForTests() == latency_histogram->MaxValueForTests());
// TODO: Implement an incoming queue latency test.
// For now we just assert that the metric exists.
ASSERT_TRUE(FindOrDie(metric_map, &METRIC_rpc_incoming_queue_time));
}
static void DestroyMessengerCallback(shared_ptr<Messenger>* messenger,
CountDownLatch* latch) {
messenger->reset();
latch->CountDown();
}
TEST_F(TestRpc, TestRpcCallbackDestroysMessenger) {
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Sockaddr bad_addr;
CountDownLatch latch(1);
AddRequestPB req;
req.set_x(rand());
req.set_y(rand());
AddResponsePB resp;
RpcController controller;
controller.set_timeout(MonoDelta::FromMilliseconds(1));
{
Proxy p(client_messenger, bad_addr, "xxx");
p.AsyncRequest("my-fake-method", req, &resp, &controller,
boost::bind(&DestroyMessengerCallback, &client_messenger, &latch));
}
latch.Wait();
}
// Test that setting the client timeout / deadline gets propagated to RPC
// services.
TEST_F(TestRpc, TestRpcContextClientDeadline) {
const uint64_t sleep_micros = 20 * 1000;
// Set up server.
Sockaddr server_addr;
StartTestServerWithGeneratedCode(&server_addr);
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
SleepRequestPB req;
req.set_sleep_micros(sleep_micros);
req.set_client_timeout_defined(true);
SleepResponsePB resp;
RpcController controller;
Status s = p.SyncRequest("Sleep", req, &resp, &controller);
ASSERT_TRUE(s.IsRemoteError());
ASSERT_STR_CONTAINS(s.ToString(), "Missing required timeout");
controller.Reset();
controller.set_timeout(MonoDelta::FromMilliseconds(1000));
ASSERT_OK(p.SyncRequest("Sleep", req, &resp, &controller));
}
// Test that setting an call-level application feature flag to an unknown value
// will make the server reject the call.
TEST_F(TestRpc, TestApplicationFeatureFlag) {
// Set up server.
Sockaddr server_addr;
StartTestServerWithGeneratedCode(&server_addr);
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
{ // Supported flag
AddRequestPB req;
req.set_x(1);
req.set_y(2);
AddResponsePB resp;
RpcController controller;
controller.RequireServerFeature(FeatureFlags::FOO);
Status s = p.SyncRequest("Add", req, &resp, &controller);
SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
ASSERT_TRUE(s.ok());
ASSERT_EQ(resp.result(), 3);
}
{ // Unsupported flag
AddRequestPB req;
req.set_x(1);
req.set_y(2);
AddResponsePB resp;
RpcController controller;
controller.RequireServerFeature(FeatureFlags::FOO);
controller.RequireServerFeature(99);
Status s = p.SyncRequest("Add", req, &resp, &controller);
SCOPED_TRACE(strings::Substitute("unsupported response: $0", s.ToString()));
ASSERT_TRUE(s.IsRemoteError());
}
}
TEST_F(TestRpc, TestApplicationFeatureFlagUnsupportedServer) {
auto savedFlags = kSupportedServerRpcFeatureFlags;
auto cleanup = MakeScopedCleanup([&] () { kSupportedServerRpcFeatureFlags = savedFlags; });
kSupportedServerRpcFeatureFlags = {};
// Set up server.
Sockaddr server_addr;
StartTestServerWithGeneratedCode(&server_addr);
// Set up client.
shared_ptr<Messenger> client_messenger(CreateMessenger("Client"));
Proxy p(client_messenger, server_addr, CalculatorService::static_service_name());
{ // Required flag
AddRequestPB req;
req.set_x(1);
req.set_y(2);
AddResponsePB resp;
RpcController controller;
controller.RequireServerFeature(FeatureFlags::FOO);
Status s = p.SyncRequest("Add", req, &resp, &controller);
SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
ASSERT_TRUE(s.IsNotSupported());
}
{ // No required flag
AddRequestPB req;
req.set_x(1);
req.set_y(2);
AddResponsePB resp;
RpcController controller;
Status s = p.SyncRequest("Add", req, &resp, &controller);
SCOPED_TRACE(strings::Substitute("supported response: $0", s.ToString()));
ASSERT_TRUE(s.ok());
}
}
} // namespace rpc
} // namespace kudu