blob: 05d0ad32f2439db2ea394a60f0fdac83570b5542 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <array>
#include <csignal>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <set>
#include <string>
#include <type_traits>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/data_gen_util.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/path_util.h"
#include "kudu/util/random.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/subprocess.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using kudu::Fifo;
using kudu::client::KuduClient;
using kudu::client::KuduInsert;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableAlterer;
using kudu::client::KuduTabletServer;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using std::array;
using std::function;
using std::set;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;
namespace kudu {
// Class template for test scenarios running against external mini-cluster
// with M masters and T tablet servers.
template<size_t M, size_t T>
class ClientProxiedRpcTest : public KuduTest {
public:
void SetUp() override {
SKIP_IF_SLOW_NOT_ALLOWED();
auto s = FindExecutable("nc", {"/bin", "/usr/bin", "/usr/local/bin"}, &nc_);
if (s.IsNotFound()) {
LOG(WARNING) << "test is skipped: could not find netcat utility (nc)";
GTEST_SKIP();
}
ASSERT_OK(s);
KuduTest::SetUp();
ExternalMiniClusterOptions opts;
opts.num_masters = M;
opts.num_tablet_servers = T;
vector<string> master_addrs;
for (size_t i = 0; i < M; ++i) {
auto& a_port = m_proxy_advertised_ports_[i];
ASSERT_OK(GetRandomPort(kIpAddr, &a_port));
auto& a_addr = m_proxy_advertised_addrs_[i];
a_addr = HostPort(kIpAddr, a_port);
auto& p_port = m_proxied_ports_[i];
ASSERT_OK(GetRandomPort(kIpAddr, &p_port));
auto& p_addr = m_proxied_addrs_[i];
p_addr = HostPort(kIpAddr, p_port);
vector<string> flags = {
Substitute("--rpc_proxy_advertised_addresses=$0", a_addr.ToString()),
Substitute("--rpc_proxied_addresses=$0", p_addr.ToString()),
};
opts.m_custom_flags.emplace_back(std::move(flags));
master_addrs.emplace_back(a_addr.ToString());
}
if (M > 1) {
const auto flag = Substitute("--master_rpc_proxy_advertised_addresses=$0",
JoinStrings(master_addrs, ","));
for (size_t i = 0; i < M; ++i) {
opts.m_custom_flags[i].push_back(flag);
}
}
for (size_t i = 0; i < T; ++i) {
auto& a_port = t_proxy_advertised_ports_[i];
ASSERT_OK(GetRandomPort(kIpAddr, &a_port));
auto& a_addr = t_proxy_advertised_addrs_[i];
a_addr = HostPort(kIpAddr, a_port);
auto& p_port = t_proxied_ports_[i];
ASSERT_OK(GetRandomPort(kIpAddr, &p_port));
auto& p_addr = t_proxied_addrs_[i];
p_addr = HostPort(kIpAddr, p_port);
vector<string> flags = {
Substitute("--rpc_proxy_advertised_addresses=$0", a_addr.ToString()),
Substitute("--rpc_proxied_addresses=$0", p_addr.ToString()),
};
opts.t_custom_flags.emplace_back(std::move(flags));
}
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
ASSERT_OK(cluster_->Start());
}
void TearDown() override {
if (cluster_) {
cluster_->Shutdown();
}
KuduTest::TearDown();
}
// Verify basic functionality when RPC connections to Kudu masters and tablet
// servers are forwarded via a TCP proxy.
void Run() {
ASSERT_FALSE(nc_.empty());
const auto kTimeout = MonoDelta::FromSeconds(5);
const char* const kTableName = CURRENT_TEST_NAME();
const auto schema = KuduSchema::FromSchema(GetSimpleTestSchema());
TestWorkload w(cluster_.get());
w.set_schema(schema);
w.set_table_name(kTableName);
w.set_num_replicas(1);
w.Setup();
vector<unique_ptr<Fifo>> m_fifos(M);
for (auto i = 0; i < M; ++i) {
const auto fname = Substitute("m.fifo.$0", i);
ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, fname), &m_fifos[i]));
}
vector<unique_ptr<Fifo>> t_fifos(T);
for (auto i = 0; i < T; ++i) {
const auto fname = Substitute("t.fifo.$0", i);
ASSERT_OK(env_->NewFifo(JoinPathSegments(test_dir_, fname), &t_fifos[i]));
}
// Run TCP proxies for Kudu masters' connections.
vector<unique_ptr<Subprocess>> m_proxies;
m_proxies.reserve(M);
vector<ScopedCleanup<function<void(void)>>> m_proxy_cleanups;
m_proxy_cleanups.reserve(M);
for (auto i = 0; i < M; ++i) {
const auto proxy_cmd_str = Substitute(
kProxyCmdPattern,
nc_,
kIpAddr,
m_proxy_advertised_ports_[i],
m_proxied_ports_[i],
m_fifos[i]->filename());
m_proxies.emplace_back(new Subprocess({"/bin/bash", "-c", proxy_cmd_str}));
auto* proxy = m_proxies.back().get();
function<void(void)> cleanup = [proxy] {
if (proxy->IsStarted()) {
WARN_NOT_OK(proxy->KillAndWait(SIGTERM),
Substitute("PID $0: could not stop process", proxy->pid()));
}
};
m_proxy_cleanups.emplace_back(std::move(cleanup));
}
for (auto& p : m_proxies) {
ASSERT_OK(p->Start());
}
// Run TCP proxies for Kudu tablet servers' connections.
vector<unique_ptr<Subprocess>> t_proxies;
t_proxies.reserve(T);
vector<ScopedCleanup<function<void(void)>>> t_proxy_cleanups;
t_proxy_cleanups.reserve(T);
for (auto i = 0; i < T; ++i) {
const auto proxy_cmd_str = Substitute(
kProxyCmdPattern,
nc_,
kIpAddr,
t_proxy_advertised_ports_[i],
t_proxied_ports_[i],
t_fifos[i]->filename());
t_proxies.emplace_back(new Subprocess({"/bin/bash", "-c", proxy_cmd_str}));
auto* proxy = t_proxies.back().get();
function<void(void)> cleanup = [proxy] {
if (proxy->IsStarted()) {
WARN_NOT_OK(proxy->KillAndWait(SIGTERM),
Substitute("PID $0: could not stop process", proxy->pid()));
}
};
t_proxy_cleanups.emplace_back(std::move(cleanup));
}
for (auto& p : t_proxies) {
ASSERT_OK(p->Start());
}
// Wait for the TCP proxies to start up.
for (auto port : m_proxy_advertised_ports_) {
ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, port, kTimeout));
}
for (auto port : t_proxy_advertised_ports_) {
ASSERT_OK(WaitForTcpBindAtPort({ kIpAddr }, port, kTimeout));
}
// Build a client to send requests via RPC endpoints advertised by proxy.
client::sp::shared_ptr<client::KuduClient> client;
{
client::KuduClientBuilder b;
for (auto i = 0; i < M; ++i) {
b.add_master_server_addr(m_proxy_advertised_addrs_[i].ToString());
}
b.default_admin_operation_timeout(kTimeout);
b.default_rpc_timeout(kTimeout);
ASSERT_OK(b.Build(&client));
}
// Make sure the client receives the addresses advertised by proxy since
// the request came through the proxied RPC address.
const vector<string> master_addresses(Split(client->GetMasterAddresses(), ","));
for (const auto& hp : m_proxy_advertised_addrs_) {
ASSERT_TRUE(std::any_of(master_addresses.begin(), master_addresses.end(),
[&hp](const string& e) { return e == hp.ToString(); }));
}
ASSERT_EQ(m_proxy_advertised_addrs_.size(), master_addresses.size());
if (M > 1) {
ASSERT_TRUE(client->IsMultiMaster());
} else {
ASSERT_FALSE(client->IsMultiMaster());
}
// Check that client sees RPC addresses advertised by TCP proxy for the
// tablet server.
vector<KuduTabletServer*> tss;
ElementDeleter deleter(&tss);
ASSERT_OK(client->ListTabletServers(&tss));
ASSERT_EQ(T, tss.size());
for (auto i = 0; i < T; ++i) {
ASSERT_EQ(kIpAddr, tss[i]->hostname());
}
{
set<uint16_t> ports;
for (auto i = 0; i < T; ++i) {
const auto port = tss[i]->port();
ports.emplace(port);
ASSERT_TRUE(std::any_of(t_proxy_advertised_ports_.begin(),
t_proxy_advertised_ports_.end(),
[&port](uint16_t e) { return e == port; }));
}
// Make sure all ports are different.
ASSERT_EQ(T, ports.size());
}
client::sp::shared_ptr<KuduTable> table;
ASSERT_OK(client->OpenTable(kTableName, &table));
// Create a session and explicitly set the flush mode to AUTO_FLUSH_SYNC
// to send every operation when calling Apply().
client::sp::shared_ptr<KuduSession> session(client->NewSession());
ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
ThreadSafeRandom rng(SeedRandom());
for (auto i = 0; i < 10; ++i) {
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
GenerateDataForRow(schema, i, &rng, row);
ASSERT_OK(session->Apply(insert.release()));
}
// Call Flush() just in case, but it's a no-op effectively since the chosen
// session flush mode.
ASSERT_OK(session->Flush());
// Read the data back.
{
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
ASSERT_OK(scanner.Open());
ASSERT_TRUE(scanner.HasMoreRows());
KuduScanBatch batch;
int32_t idx = 0;
while (scanner.HasMoreRows()) {
ASSERT_OK(scanner.NextBatch(&batch));
for (const auto& row : batch) {
int32_t value;
ASSERT_OK(row.GetInt32(0, &value));
ASSERT_EQ(idx++, value);
}
}
ASSERT_EQ(10, idx);
}
// Make sure the client indeed works through the RPC addresses advertised by
// proxy: stop the proxy and check if client can succeed in writing any data
// to the table.
for (auto i = 0; i < T; ++i) {
t_proxy_cleanups[i].cancel();
ASSERT_OK(t_proxies[i]->KillAndWait(SIGTERM));
}
{
unique_ptr<KuduInsert> insert(table->NewInsert());
auto* row = insert->mutable_row();
GenerateDataForRow(schema, 100, &rng, row);
const auto s = session->Apply(insert.release());
ASSERT_TRUE(s.IsIOError()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "failed to flush data");
}
// Try reading the data now: this expected to fail since the client works
// only through the advertised addresses.
{
KuduScanner scanner(table.get());
ASSERT_OK(scanner.SetTimeoutMillis(kTimeout.ToMilliseconds()));
const auto s = scanner.Open();
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(),
"(timed out after deadline expired|exceeded configured scan timeout)");
}
// Meanwhile, DDL operations should be still possible: connections to
// masters are still being proxied as needed, and masters and tablet servers
// communicate via standard, non-proxied RPC endpoints.
{
unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
alt->AlterColumn("string_val")->RenameTo("str_val");
ASSERT_OK(alt->Alter());
}
// Make sure the client communicates with masters via the advertised
// addresses: once the corresponding TCP proxy is shut down, the client
// should not be able to reach the master to perform a DDL operation.
for (auto i = 0; i < M; ++i) {
m_proxy_cleanups[i].cancel();
ASSERT_OK(m_proxies[i]->KillAndWait(SIGTERM));
}
{
unique_ptr<KuduTableAlterer> alt(client->NewTableAlterer(kTableName));
alt->AlterColumn("str_val")->RenameTo("string_val");
const auto s = alt->Alter();
ASSERT_TRUE(s.IsTimedOut()) << s.ToString();
ASSERT_STR_CONTAINS(s.ToString(), "AlterTable passed its deadline");
ASSERT_STR_CONTAINS(s.ToString(), "Client connection negotiation failed");
}
}
protected:
static constexpr const char* const kIpAddr = "127.0.0.1";
static constexpr const char* const kProxyCmdPattern =
"trap \"kill %1\" EXIT; $0 -knv -l $1 $2 <$4 | $0 -nv $1 $3 >$4";
// Full path to the nc/netcat utility (if present).
string nc_;
array<uint16_t, M> m_proxied_ports_;
array<HostPort, M> m_proxied_addrs_;
array<uint16_t, M> m_proxy_advertised_ports_;
array<HostPort, M> m_proxy_advertised_addrs_;
array<uint16_t, T> t_proxied_ports_;
array<HostPort, T> t_proxied_addrs_;
array<uint16_t, T> t_proxy_advertised_ports_;
array<HostPort, T> t_proxy_advertised_addrs_;
unique_ptr<ExternalMiniCluster> cluster_;
};
typedef ClientProxiedRpcTest<1, 1> ClientProxiedRpc1M1Test;
TEST_F(ClientProxiedRpc1M1Test, Basic) {
NO_FATALS(Run());
}
typedef ClientProxiedRpcTest<1, 3> ClientProxiedRpc1M3Test;
TEST_F(ClientProxiedRpc1M3Test, Basic) {
NO_FATALS(Run());
}
typedef ClientProxiedRpcTest<3, 1> ClientProxiedRpc3M1Test;
TEST_F(ClientProxiedRpc3M1Test, Basic) {
NO_FATALS(Run());
}
typedef ClientProxiedRpcTest<3, 3> ClientProxiedRpc3M3Test;
TEST_F(ClientProxiedRpc3M3Test, Basic) {
NO_FATALS(Run());
}
} // namespace kudu