blob: 58735bdedacd348328e9c478ebf4554f5bc26ba6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags_declare.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_bool(client_use_unix_domain_sockets);
DECLARE_string(dns_addr_resolution_override);
DECLARE_string(host_for_tests);
METRIC_DECLARE_counter(rpc_connections_accepted_unix_domain_socket);
METRIC_DECLARE_entity(server);
using kudu::client::KuduClient;
using kudu::client::KuduTabletServer;
using kudu::cluster::ExternalDaemon;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::pb_util::SecureShortDebugString;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
namespace itest {
namespace {
constexpr const char* kTServerHostPrefix = "tserver.host";
constexpr const char* kMasterHostPrefix = "master.host";
} // anonymous namespace
class DnsAliasITest : public KuduTest {
public:
void SetUp() override {
SetUpCluster();
}
// TODO(awong): more plumbing is needed to allow the server to be restarted
// bound to a different address with the webserver, so just disable it.
void SetUpCluster(vector<string> extra_master_flags = { "--webserver_enabled=false" },
vector<string> extra_tserver_flags = { "--webserver_enabled=false" }) {
ExternalMiniClusterOptions opts;
opts.num_masters = 3;
opts.num_tablet_servers = 3;
opts.extra_master_flags = std::move(extra_master_flags);
opts.extra_tserver_flags = std::move(extra_tserver_flags);
opts.master_alias_prefix = kMasterHostPrefix;
opts.tserver_alias_prefix = kTServerHostPrefix;
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
FLAGS_dns_addr_resolution_override = cluster_->dns_overrides();
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
}
void TearDown() override {
NO_FATALS(cluster_->AssertNoCrashes());
}
// Get the new DNS override string when restarting the last node of the given
// daemon type with the given reserved address.
string GetNewOverridesFlag(ExternalMiniCluster::DaemonType node_type,
const Sockaddr& new_addr) {
int master_end_idx = cluster_->num_masters();
int tserver_end_idx = cluster_->num_tablet_servers();
bool is_master = node_type == ExternalMiniCluster::DaemonType::MASTER;
if (is_master) {
--master_end_idx;
} else {
--tserver_end_idx;
}
vector<string> new_overrides;
new_overrides.reserve(cluster_->num_masters() + cluster_->num_tablet_servers());
for (int i = 0; i < master_end_idx; i++) {
new_overrides.emplace_back(Substitute("$0.$1=$2", kMasterHostPrefix, i,
cluster_->master(i)->bound_rpc_addr().ToString()));
}
for (int i = 0; i < tserver_end_idx; i++) {
new_overrides.emplace_back(
Substitute("$0.$1=$2", kTServerHostPrefix, i,
cluster_->tablet_server(i)->bound_rpc_addr().ToString()));
}
new_overrides.emplace_back(
Substitute("$0.$1=$2", is_master ? kMasterHostPrefix : kTServerHostPrefix,
is_master ? master_end_idx : tserver_end_idx,
new_addr.ToString()));
return JoinStrings(new_overrides, ",");
}
// Adds the appropriate flags for the given daemon to be restarted bound to
// the given address.
void SetUpDaemonForNewAddr(const Sockaddr& new_addr, const string& new_overrides_str,
ExternalDaemon* daemon) {
HostPort new_ip_hp(new_addr.host(), new_addr.port());
daemon->SetRpcBindAddress(new_ip_hp);
daemon->mutable_flags()->emplace_back("--rpc_reuseport=true");
daemon->mutable_flags()->emplace_back(
Substitute("--dns_addr_resolution_override=$0", new_overrides_str));
}
// Sets the flags on all nodes in the cluster, except for the last node of
// the given 'node_type', which is expected to have been restarted with the
// appropriate flags.
void SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType node_type,
const string& new_overrides_str) {
int master_end_idx = cluster_->num_masters();
int tserver_end_idx = cluster_->num_tablet_servers();
if (node_type == ExternalMiniCluster::DaemonType::MASTER) {
--master_end_idx;
} else {
--tserver_end_idx;
}
for (int i = 0; i < master_end_idx; i++) {
ASSERT_OK(cluster_->SetFlag(
cluster_->master(i), "dns_addr_resolution_override", new_overrides_str));
}
for (int i = 0; i < tserver_end_idx; i++) {
ASSERT_OK(cluster_->SetFlag(
cluster_->tablet_server(i), "dns_addr_resolution_override", new_overrides_str));
}
}
protected:
unique_ptr<ExternalMiniCluster> cluster_;
client::sp::shared_ptr<KuduClient> client_;
};
TEST_F(DnsAliasITest, TestBasic) {
// Based on the mini-cluster setup, the client should report the aliases.
auto master_addrs_str = client_->GetMasterAddresses();
vector<string> master_addrs = Split(master_addrs_str, ",");
ASSERT_EQ(cluster_->num_masters(), master_addrs.size()) << master_addrs_str;
for (const auto& master_addr : master_addrs) {
ASSERT_STR_CONTAINS(master_addr, kMasterHostPrefix);
// Try resolving a numeric IP. This should fail, since the returned values
// should be aliased.
Sockaddr addr;
Status s = addr.ParseString(master_addr, 0);
ASSERT_FALSE(s.ok());
}
vector<KuduTabletServer*> tservers;
ElementDeleter deleter(&tservers);
ASSERT_OK(client_->ListTabletServers(&tservers));
for (const auto* tserver : tservers) {
ASSERT_STR_CONTAINS(tserver->hostname(), kTServerHostPrefix);
// Try resolving a numeric IP. This should fail, since the returned values
// should be aliased.
Sockaddr addr;
Status s = addr.ParseString(tserver->hostname(), 0);
ASSERT_FALSE(s.ok());
}
// Running a test worload should succeed. Have the workload perform both
// scans and writes to exercise the aliasing codepaths of each.
TestWorkload w(cluster_.get());
w.set_num_write_threads(1);
w.set_num_read_threads(3);
w.set_num_replicas(3);
w.Setup();
w.Start();
while (w.rows_inserted() < 10) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
}
class DnsAliasWithUnixSocketsITest : public DnsAliasITest {
public:
void SetUp() override {
// Configure --host_for_tests in this process so the test client will think
// it's local to a tserver.
FLAGS_host_for_tests = Substitute("$0.$1", kTServerHostPrefix, kTServerIdxWithLocalClient);
FLAGS_client_use_unix_domain_sockets = true;
SetUpCluster({ "--rpc_listen_on_unix_domain_socket=true" },
{ "--rpc_listen_on_unix_domain_socket=true" });
}
protected:
const int kTServerIdxWithLocalClient = 0;
};
TEST_F(DnsAliasWithUnixSocketsITest, TestBasic) {
TestWorkload w(cluster_.get());
w.set_num_write_threads(1);
w.set_num_read_threads(3);
w.set_num_replicas(3);
w.Setup();
w.Start();
while (w.rows_inserted() < 10) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
int64_t unix_connections = 0;
// Curl doesn't know about our DNS aliasing, so resolve the address and
// fetch the metric from the proper address.
vector<Sockaddr> addrs;
ASSERT_OK(cluster_->tablet_server(i)->bound_http_hostport().ResolveAddresses(&addrs));
ASSERT_EQ(1, addrs.size());
const auto& addr = addrs[0];
ASSERT_OK(GetInt64Metric(HostPort(addr.host(), addr.port()),
&METRIC_ENTITY_server, nullptr,
&METRIC_rpc_connections_accepted_unix_domain_socket,
"value", &unix_connections));
if (i == kTServerIdxWithLocalClient) {
ASSERT_LT(0, unix_connections);
} else {
ASSERT_EQ(0, unix_connections);
}
}
}
// These tests depend on restarted servers being assigned a new IP address. On
// MacOS, tservers are all assigned the same address, so don't run them there.
#if defined(__linux__)
// Regression test for KUDU-1620, wherein consensus proxies don't eventually
// succeed when the address changes but the host/ports stays the same.
TEST_F(DnsAliasITest, Kudu1620) {
TestWorkload w(cluster_.get());
w.set_num_replicas(3);
w.set_num_write_threads(1);
w.Setup();
w.Start();
while (w.rows_inserted() < 10) {
SleepFor(MonoDelta::FromMilliseconds(10));
}
w.StopAndJoin();
// Shut down a tablet server and start one up at a different IP.
auto* tserver = cluster_->tablet_server(cluster_->num_tablet_servers() - 1);
tserver->Shutdown();
unique_ptr<Socket> reserved_socket;
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::TSERVER, 3,
kDefaultBindMode, &reserved_socket,
tserver->bound_rpc_hostport().port()));
Sockaddr new_addr;
ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));
// Once we start having the other servers communicate with the new tserver,
// ksck should return healthy.
auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::TSERVER, new_addr);
SetUpDaemonForNewAddr(new_addr, new_overrides_str, tserver);
ASSERT_OK(tserver->Restart());
// Running ksck should fail because the existing servers are still trying to
// communicate with the old port.
ClusterVerifier v(cluster_.get());
Status s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::TSERVER, new_overrides_str);
// Our test thread still thinks the old alias is still valid, so our ksck
// should fail.
s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
// Once we set the DNS aliases in the test thread, ksck should succeed.
FLAGS_dns_addr_resolution_override = new_overrides_str;
ASSERT_EVENTUALLY([&] {
ASSERT_OK(v.RunKsck());
});
}
// Master-side regression test for KUDU-1620. Masters instantiate consensus
// proxies to get the UUIDs of its peers. With KUDU-1620 resolved, the proxy
// used should be able to re-resolve and retry upon failure, rather than
// retrying at the same address.
TEST_F(DnsAliasITest, TestMasterReresolveOnStartup) {
const int last_master_idx = cluster_->num_masters() - 1;
auto* master = cluster_->master(last_master_idx);
// Shut down and prepare the node that we're going to give a new address.
master->Shutdown();
unique_ptr<Socket> reserved_socket;
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::MASTER, 3,
kDefaultBindMode, &reserved_socket,
master->bound_rpc_hostport().port()));
Sockaddr new_addr;
ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));
auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::MASTER, new_addr);
SetUpDaemonForNewAddr(new_addr, new_overrides_str, master);
// Shut down the other masters so we can test what happens when they come
// back up.
for (int i = 0; i < last_master_idx; i++) {
cluster_->master(i)->Shutdown();
}
for (int i = 0; i < last_master_idx; i++) {
ASSERT_OK(cluster_->master(i)->Restart());
}
// Since the rest of the cluster doesn't know about the address, ksck will
// fail.
ClusterVerifier v(cluster_.get());
Status s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
// Even upon setting the DNS overrides on the rest of the nodes, since the
// master hasn't started, we should still see an error.
SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::MASTER, new_overrides_str);
s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
FLAGS_dns_addr_resolution_override = new_overrides_str;
s = v.RunKsck();
ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
// Upon restarting the node, the other masters should be able to resolve and
// connect to it.
ASSERT_OK(master->Restart());
ASSERT_EVENTUALLY([&] {
ASSERT_OK(v.RunKsck());
});
}
// Regression test for KUDU-1885, wherein tserver proxies on the masters don't
// eventually succeed when the tserver's address changes.
TEST_F(DnsAliasITest, Kudu1885) {
// First, wait for all tablet servers to report to the masters.
ASSERT_EVENTUALLY([&] {
vector<KuduTabletServer*> tservers;
ElementDeleter deleter(&tservers);
ASSERT_OK(client_->ListTabletServers(&tservers));
ASSERT_EQ(cluster_->num_tablet_servers(), tservers.size());
});
auto* tserver = cluster_->tablet_server(cluster_->num_tablet_servers() - 1);
// Shut down a tablet server so we can start it up with a different address.
tserver->Shutdown();
unique_ptr<Socket> reserved_socket;
ASSERT_OK(cluster_->ReserveDaemonSocket(cluster::ExternalMiniCluster::DaemonType::TSERVER, 3,
kDefaultBindMode, &reserved_socket,
tserver->bound_rpc_hostport().port()));
Sockaddr new_addr;
ASSERT_OK(reserved_socket->GetSocketAddress(&new_addr));
auto new_overrides_str = GetNewOverridesFlag(ExternalMiniCluster::DaemonType::TSERVER, new_addr);
SetUpDaemonForNewAddr(new_addr, new_overrides_str, tserver);
// Create several tables. Based on Kudu's tablet placement algorithm, some
// should be assigned to the tserver with a new address. This will start some
// tasks on the master to send requests to tablet servers (some of which will
// fail because of the down server).
// NOTE: master's will wait up to --tserver_unresponsive_timeout_ms before
// stopping replica placement on the down server. By default, this is 60
// seconds, so we can proceed expecting placement on the down tserver.
for (int i = 0; i < 10; i++) {
TestWorkload w(cluster_.get());
w.set_table_name(Substitute("default.table_$0", i));
w.set_num_replicas(1);
// Some tablet creations will initially fail until we restart the down
// server, so have our client not wait for creation to finish.
w.set_wait_for_create(false);
w.Setup();
}
ASSERT_OK(tserver->Restart());
// Allow the rest of the cluster to start seeing the re-addressed server.
SetFlagsOnRemainingCluster(ExternalMiniCluster::DaemonType::TSERVER, new_overrides_str);
FLAGS_dns_addr_resolution_override = new_overrides_str;
ClusterVerifier v(cluster_.get());
ASSERT_EVENTUALLY([&] {
ASSERT_OK(v.RunKsck());
});
// Ensure there's no funny business with the tserver coming up at a new
// address -- we should have the same number of tablet servers.
vector<KuduTabletServer*> tservers;
ElementDeleter deleter(&tservers);
ASSERT_OK(client_->ListTabletServers(&tservers));
ASSERT_EQ(cluster_->num_tablet_servers(), tservers.size());
// Some tablets should be assigned to the tablet we re-addressed, and the
// create tablet requests from the masters should have been routed as
// appropriate.
auto tserver_proxy = cluster_->tserver_proxy(cluster_->num_tablet_servers() - 1);
tserver::ListTabletsRequestPB req;
req.set_need_schema_info(false);
tserver::ListTabletsResponsePB resp;
rpc::RpcController controller;
ASSERT_OK(tserver_proxy->ListTablets(req, &resp, &controller));
ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp.error());
ASSERT_GT(resp.status_and_schema_size(), 0);
}
#endif
} // namespace itest
} // namespace kudu