blob: 8ca917df192281eec260c058fb1f0e3ec8bb22bc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include <cstdint>
#include <memory>
#include <ostream>
#include <unordered_set>
#include <utility>
#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/master.proxy.h"
#include "kudu/master/mini_master.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/messenger.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/env.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/net/socket.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util.h"
using std::unique_ptr;
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace cluster {
using client::KuduClient;
using client::KuduClientBuilder;
using master::CatalogManager;
using master::MasterServiceProxy;
using master::MiniMaster;
using master::TSDescriptor;
using std::shared_ptr;
using tserver::MiniTabletServer;
using tserver::TabletServer;
using tserver::TabletServerAdminServiceProxy;
using tserver::TabletServerServiceProxy;
InternalMiniClusterOptions::InternalMiniClusterOptions()
: num_masters(1),
supply_single_master_addr(false),
num_tablet_servers(1),
num_data_dirs(1),
bind_mode(kDefaultBindMode) {
}
InternalMiniCluster::InternalMiniCluster(Env* env, InternalMiniClusterOptions options)
: env_(env),
opts_(std::move(options)),
running_(false) {
if (opts_.cluster_root.empty()) {
opts_.cluster_root = JoinPathSegments(GetTestDataDirectory(), "minicluster-data");
}
}
InternalMiniCluster::~InternalMiniCluster() {
Shutdown();
}
Status InternalMiniCluster::Start() {
CHECK(!opts_.cluster_root.empty()) << "No cluster root was provided";
CHECK(!running_);
if (!env_->FileExists(opts_.cluster_root)) {
RETURN_NOT_OK(env_->CreateDir(opts_.cluster_root));
}
RETURN_NOT_OK_PREPEND(StartMasters(), "Couldn't start masters");
for (int i = 0; i < opts_.num_tablet_servers; i++) {
RETURN_NOT_OK_PREPEND(AddTabletServer(),
Substitute("Error adding TS $0", i));
}
RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(opts_.num_tablet_servers),
"Waiting for tablet servers to start");
RETURN_NOT_OK_PREPEND(rpc::MessengerBuilder("minicluster-messenger")
.set_num_reactors(1)
.set_max_negotiation_threads(1)
.Build(&messenger_),
"Failed to start Messenger for minicluster");
running_ = true;
return Status::OK();
}
Status InternalMiniCluster::StartMasters() {
int num_masters = opts_.num_masters;
// Collect and keep alive the set of master sockets bound with SO_REUSEPORT
// until all master proccesses are started. This allows the mini-cluster to
// reserve a set of ports up front, then later start the set of masters, each
// configured with the full set of ports.
//
// TODO(dan): re-bind the ports between node restarts in order to prevent other
// processess from binding to them in the interim.
vector<unique_ptr<Socket>> reserved_sockets;
if (mini_masters_.empty()) {
vector<HostPort> master_rpc_addrs;
for (int i = 0; i < num_masters; i++) {
unique_ptr<Socket> reserved_socket;
RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(DaemonType::MASTER, i, opts_.bind_mode,
&reserved_socket),
"failed to reserve master socket address");
Sockaddr addr;
RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr));
master_rpc_addrs.emplace_back(addr.host(), addr.port());
reserved_sockets.emplace_back(std::move(reserved_socket));
}
LOG(INFO) << "Creating distributed mini masters. Addrs: "
<< HostPort::ToCommaSeparatedString(master_rpc_addrs);
for (int i = 0; i < num_masters; i++) {
auto mini_master(std::make_shared<MiniMaster>(
GetMasterFsRoot(i), master_rpc_addrs[i]));
if (num_masters > 1 || opts_.supply_single_master_addr) {
mini_master->SetMasterAddresses(master_rpc_addrs);
}
mini_masters_.emplace_back(std::move(mini_master));
}
}
CHECK_EQ(num_masters, mini_masters_.size());
for (int i = 0; i < num_masters; i++) {
RETURN_NOT_OK_PREPEND(mini_masters_[i]->Start(), Substitute("failed to start master $0", i));
VLOG(1) << "Started MiniMaster with UUID " << mini_masters_[i]->permanent_uuid()
<< " at index " << i;
}
for (int i = 0; i < num_masters; i++) {
LOG(INFO) << "Waiting to initialize catalog manager on master " << i;
RETURN_NOT_OK_PREPEND(mini_masters_[i]->WaitForCatalogManagerInit(),
Substitute("Could not initialize catalog manager on master $0", i));
}
if (num_masters == 1) {
RETURN_NOT_OK(mini_masters_[0]->master()->WaitUntilCatalogManagerIsLeaderAndReadyForTests(
MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds)));
}
return Status::OK();
}
Status InternalMiniCluster::StartSync() {
RETURN_NOT_OK(Start());
int count = 0;
for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
Substitute("TabletServer $0 failed to start.", count));
count++;
}
return Status::OK();
}
Status InternalMiniCluster::AddTabletServer() {
if (mini_masters_.empty()) {
return Status::IllegalState("Master not yet initialized");
}
int new_idx = mini_tablet_servers_.size();
uint16_t ts_rpc_port = 0;
if (opts_.tserver_rpc_ports.size() > new_idx) {
ts_rpc_port = opts_.tserver_rpc_ports[new_idx];
}
string bind_ip = GetBindIpForDaemonWithType(MiniCluster::TSERVER, new_idx, opts_.bind_mode);
unique_ptr<MiniTabletServer> tablet_server(new MiniTabletServer(
GetTabletServerFsRoot(new_idx),
HostPort(bind_ip, ts_rpc_port),
opts_.num_data_dirs));
// set the master addresses
tablet_server->options()->master_addresses = master_rpc_addrs();
RETURN_NOT_OK(tablet_server->Start());
mini_tablet_servers_.push_back(shared_ptr<MiniTabletServer>(tablet_server.release()));
return Status::OK();
}
void InternalMiniCluster::ShutdownNodes(ClusterNodes nodes) {
if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) {
for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
tablet_server->Shutdown();
}
mini_tablet_servers_.clear();
}
if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) {
for (const shared_ptr<MiniMaster>& master_server : mini_masters_) {
master_server->Shutdown();
}
}
running_ = false;
}
MiniMaster* InternalMiniCluster::mini_master(int idx) const {
CHECK_GE(idx, 0) << "Master idx must be >= 0";
CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
return mini_masters_[idx].get();
}
MiniTabletServer* InternalMiniCluster::mini_tablet_server(int idx) const {
CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
return mini_tablet_servers_[idx].get();
}
MiniTabletServer* InternalMiniCluster::mini_tablet_server_by_uuid(const string& uuid) const {
for (const auto& ts : mini_tablet_servers_) {
if (ts->uuid() == uuid) {
return ts.get();
}
}
return nullptr;
}
int InternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const {
for (int i = 0; i < mini_tablet_servers_.size(); i++) {
if (mini_tablet_servers_[i]->uuid() == uuid) {
return i;
}
}
return -1;
}
vector<HostPort> InternalMiniCluster::master_rpc_addrs() const {
vector<HostPort> master_hostports;
for (const auto& master : mini_masters_) {
Sockaddr add = master->bound_rpc_addr();
master_hostports.emplace_back(add.host(), add.port());
}
return master_hostports;
}
string InternalMiniCluster::GetMasterFsRoot(int idx) const {
return JoinPathSegments(opts_.cluster_root, Substitute("master-$0-root", idx));
}
string InternalMiniCluster::GetTabletServerFsRoot(int idx) const {
return JoinPathSegments(opts_.cluster_root, Substitute("ts-$0-root", idx));
}
Status InternalMiniCluster::FlushTablet(const string& tablet_id) {
// Flag to ensure specified tablet is found and flushed on at least one of the tablet servers.
bool tablet_found = false;
for (const auto& ts : mini_tablet_servers_) {
scoped_refptr<tablet::TabletReplica> tablet_replica;
if (ts->server()->tablet_manager()->LookupTablet(tablet_id, &tablet_replica)) {
tablet_found = true;
RETURN_NOT_OK(tablet_replica->tablet()->Flush());
}
}
if (!tablet_found) {
return Status::NotFound(Substitute("Tablet $0 not found on any of the tablet servers",
tablet_id));
}
return Status::OK();
}
Status InternalMiniCluster::WaitForTabletServerCount(int count) const {
vector<shared_ptr<master::TSDescriptor>> descs;
return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
}
Status InternalMiniCluster::WaitForTabletServerCount(int count,
MatchMode mode,
vector<shared_ptr<TSDescriptor>>* descs) const {
std::unordered_set<int> masters_to_search;
for (int i = 0; i < num_masters(); i++) {
if (!mini_master(i)->master()->IsShutdown()) {
masters_to_search.insert(i);
}
}
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
mini_master(*iter)->master()->ts_manager()->GetAllDescriptors(descs);
int match_count = 0;
switch (mode) {
case MatchMode::MATCH_TSERVERS:
// GetAllDescriptors() may return servers that are no longer online.
// Do a second step of verification to verify that the descs that we got
// are aligned (same uuid/seqno) with the TSs that we have in the cluster.
for (const shared_ptr<TSDescriptor>& desc : *descs) {
for (const auto& mini_tablet_server : mini_tablet_servers_) {
const TabletServer* ts = mini_tablet_server->server();
if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
match_count++;
break;
}
}
}
break;
case MatchMode::DO_NOT_MATCH_TSERVERS:
match_count = descs->size();
break;
default:
LOG(FATAL) << "Invalid match mode";
}
if (match_count == count) {
// This master has returned the correct set of tservers.
iter = masters_to_search.erase(iter);
} else {
iter++;
}
}
if (masters_to_search.empty()) {
// All masters have returned the correct set of tservers.
LOG(INFO) << Substitute("$0 TS(s) registered with all masters after $1s",
count, sw.elapsed().wall_seconds());
return Status::OK();
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
return Status::TimedOut(Substitute(
"Timed out waiting for $0 TS(s) to register with all masters", count));
}
Status InternalMiniCluster::CreateClient(KuduClientBuilder* builder,
client::sp::shared_ptr<KuduClient>* client) const {
client::KuduClientBuilder defaults;
if (builder == nullptr) {
builder = &defaults;
}
builder->clear_master_server_addrs();
for (const shared_ptr<MiniMaster>& master : mini_masters_) {
CHECK(master);
builder->add_master_server_addr(master->bound_rpc_addr_str());
}
return builder->Build(client);
}
Status InternalMiniCluster::GetLeaderMasterIndex(int* idx) const {
const MonoTime deadline = MonoTime::Now() +
MonoDelta::FromSeconds(kMasterStartupWaitTimeSeconds);
int leader_idx = -1;
while (MonoTime::Now() < deadline) {
for (int i = 0; i < num_masters(); i++) {
master::MiniMaster* mm = mini_master(i);
if (!mm->is_started() || mm->master()->IsShutdown()) {
continue;
}
master::CatalogManager* catalog = mm->master()->catalog_manager();
master::CatalogManager::ScopedLeaderSharedLock l(catalog);
if (l.first_failed_status().ok()) {
leader_idx = i;
break;
}
}
if (leader_idx != -1) {
break;
}
SleepFor(MonoDelta::FromMilliseconds(100));
}
if (leader_idx == -1) {
return Status::NotFound("Leader master was not found within deadline");
}
if (idx) {
*idx = leader_idx;
}
return Status::OK();
}
std::shared_ptr<rpc::Messenger> InternalMiniCluster::messenger() const {
return messenger_;
}
std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy() const {
CHECK_EQ(1, mini_masters_.size());
return master_proxy(0);
}
std::shared_ptr<MasterServiceProxy> InternalMiniCluster::master_proxy(int idx) const {
const auto& addr = CHECK_NOTNULL(mini_master(idx))->bound_rpc_addr();
return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host());
}
std::shared_ptr<TabletServerServiceProxy> InternalMiniCluster::tserver_proxy(int idx) const {
const auto& addr = CHECK_NOTNULL(mini_tablet_server(idx))->bound_rpc_addr();
return std::make_shared<TabletServerServiceProxy>(messenger_, addr, addr.host());
}
std::shared_ptr<TabletServerAdminServiceProxy> InternalMiniCluster::tserver_admin_proxy(
int idx) const {
const auto& addr = CHECK_NOTNULL(mini_tablet_server(idx))->bound_rpc_addr();
return std::make_shared<TabletServerAdminServiceProxy>(messenger_, addr, addr.host());
}
string InternalMiniCluster::WalRootForTS(int ts_idx) const {
return mini_tablet_server(ts_idx)->options()->fs_opts.wal_root;
}
string InternalMiniCluster::UuidForTS(int ts_idx) const {
return mini_tablet_server(ts_idx)->uuid();
}
} // namespace cluster
} // namespace kudu