blob: 584822fe809ad067e8e0ee1ee1782d7d2f0a4d6a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/integration-tests/mini_cluster.h"
#include "kudu/client/client.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/master/catalog_manager.h"
#include "kudu/master/master.h"
#include "kudu/master/mini_master.h"
#include "kudu/master/ts_descriptor.h"
#include "kudu/master/ts_manager.h"
#include "kudu/rpc/messenger.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_util.h"
using strings::Substitute;
namespace kudu {
using client::KuduClient;
using client::KuduClientBuilder;
using master::MiniMaster;
using master::TabletLocationsPB;
using master::TSDescriptor;
using std::shared_ptr;
using tserver::MiniTabletServer;
using tserver::TabletServer;
MiniClusterOptions::MiniClusterOptions()
: num_masters(1),
num_tablet_servers(1) {
}
MiniCluster::MiniCluster(Env* env, const MiniClusterOptions& options)
: running_(false),
env_(env),
fs_root_(!options.data_root.empty() ? options.data_root :
JoinPathSegments(GetTestDataDirectory(), "minicluster-data")),
num_masters_initial_(options.num_masters),
num_ts_initial_(options.num_tablet_servers),
master_rpc_ports_(options.master_rpc_ports),
tserver_rpc_ports_(options.tserver_rpc_ports) {
mini_masters_.resize(num_masters_initial_);
}
MiniCluster::~MiniCluster() {
CHECK(!running_);
}
Status MiniCluster::Start() {
CHECK(!fs_root_.empty()) << "No Fs root was provided";
CHECK(!running_);
if (num_masters_initial_ > 1) {
CHECK_GE(master_rpc_ports_.size(), num_masters_initial_);
}
if (!env_->FileExists(fs_root_)) {
RETURN_NOT_OK(env_->CreateDir(fs_root_));
}
// start the masters
if (num_masters_initial_ > 1) {
RETURN_NOT_OK_PREPEND(StartDistributedMasters(),
"Couldn't start distributed masters");
} else {
RETURN_NOT_OK_PREPEND(StartSingleMaster(), "Couldn't start the single master");
}
for (int i = 0; i < num_ts_initial_; i++) {
RETURN_NOT_OK_PREPEND(AddTabletServer(),
Substitute("Error adding TS $0", i));
}
RETURN_NOT_OK_PREPEND(WaitForTabletServerCount(num_ts_initial_),
"Waiting for tablet servers to start");
running_ = true;
return Status::OK();
}
Status MiniCluster::StartDistributedMasters() {
CHECK_GE(master_rpc_ports_.size(), num_masters_initial_);
CHECK_GT(master_rpc_ports_.size(), 1);
LOG(INFO) << "Creating distributed mini masters. Ports: "
<< JoinInts(master_rpc_ports_, ", ");
for (int i = 0; i < num_masters_initial_; i++) {
gscoped_ptr<MiniMaster> mini_master(
new MiniMaster(env_, GetMasterFsRoot(i), master_rpc_ports_[i]));
RETURN_NOT_OK_PREPEND(mini_master->StartDistributedMaster(master_rpc_ports_),
Substitute("Couldn't start follower $0", i));
VLOG(1) << "Started MiniMaster with UUID " << mini_master->permanent_uuid()
<< " at index " << i;
mini_masters_[i] = shared_ptr<MiniMaster>(mini_master.release());
}
int i = 0;
for (const shared_ptr<MiniMaster>& master : mini_masters_) {
LOG(INFO) << "Waiting to initialize catalog manager on master " << i++;
RETURN_NOT_OK_PREPEND(master->WaitForCatalogManagerInit(),
Substitute("Could not initialize catalog manager on master $0", i));
}
return Status::OK();
}
Status MiniCluster::StartSync() {
RETURN_NOT_OK(Start());
int count = 0;
for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
RETURN_NOT_OK_PREPEND(tablet_server->WaitStarted(),
Substitute("TabletServer $0 failed to start.", count));
count++;
}
return Status::OK();
}
Status MiniCluster::StartSingleMaster() {
// If there's a single master, 'mini_masters_' must be size 1.
CHECK_EQ(mini_masters_.size(), 1);
CHECK_LE(master_rpc_ports_.size(), 1);
uint16_t master_rpc_port = 0;
if (master_rpc_ports_.size() == 1) {
master_rpc_port = master_rpc_ports_[0];
}
// start the master (we need the port to set on the servers).
gscoped_ptr<MiniMaster> mini_master(
new MiniMaster(env_, GetMasterFsRoot(0), master_rpc_port));
RETURN_NOT_OK_PREPEND(mini_master->Start(), "Couldn't start master");
RETURN_NOT_OK(mini_master->master()->
WaitUntilCatalogManagerIsLeaderAndReadyForTests(MonoDelta::FromSeconds(5)));
mini_masters_[0] = shared_ptr<MiniMaster>(mini_master.release());
return Status::OK();
}
Status MiniCluster::AddTabletServer() {
if (mini_masters_.empty()) {
return Status::IllegalState("Master not yet initialized");
}
int new_idx = mini_tablet_servers_.size();
uint16_t ts_rpc_port = 0;
if (tserver_rpc_ports_.size() > new_idx) {
ts_rpc_port = tserver_rpc_ports_[new_idx];
}
gscoped_ptr<MiniTabletServer> tablet_server(
new MiniTabletServer(GetTabletServerFsRoot(new_idx), ts_rpc_port));
// set the master addresses
tablet_server->options()->master_addresses.clear();
for (const shared_ptr<MiniMaster>& master : mini_masters_) {
tablet_server->options()->master_addresses.push_back(HostPort(master->bound_rpc_addr()));
}
RETURN_NOT_OK(tablet_server->Start())
mini_tablet_servers_.push_back(shared_ptr<MiniTabletServer>(tablet_server.release()));
return Status::OK();
}
MiniMaster* MiniCluster::leader_mini_master() {
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < kMasterLeaderElectionWaitTimeSeconds) {
for (int i = 0; i < mini_masters_.size(); i++) {
MiniMaster* master = mini_master(i);
if (master->master()->IsShutdown()) {
continue;
}
if (master->master()->catalog_manager()->IsInitialized() &&
master->master()->catalog_manager()->CheckIsLeaderAndReady().ok()) {
return master;
}
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
LOG(ERROR) << "No leader master elected after " << kMasterLeaderElectionWaitTimeSeconds
<< " seconds.";
return nullptr;
}
void MiniCluster::Shutdown() {
for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
tablet_server->Shutdown();
}
mini_tablet_servers_.clear();
for (shared_ptr<MiniMaster>& master_server : mini_masters_) {
master_server->Shutdown();
master_server.reset();
}
running_ = false;
}
void MiniCluster::ShutdownMasters() {
for (shared_ptr<MiniMaster>& master_server : mini_masters_) {
master_server->Shutdown();
master_server.reset();
}
}
MiniMaster* MiniCluster::mini_master(int idx) {
CHECK_GE(idx, 0) << "Master idx must be >= 0";
CHECK_LT(idx, mini_masters_.size()) << "Master idx must be < num masters started";
return mini_masters_[idx].get();
}
MiniTabletServer* MiniCluster::mini_tablet_server(int idx) {
CHECK_GE(idx, 0) << "TabletServer idx must be >= 0";
CHECK_LT(idx, mini_tablet_servers_.size()) << "TabletServer idx must be < 'num_ts_started_'";
return mini_tablet_servers_[idx].get();
}
string MiniCluster::GetMasterFsRoot(int idx) {
return JoinPathSegments(fs_root_, Substitute("master-$0-root", idx));
}
string MiniCluster::GetTabletServerFsRoot(int idx) {
return JoinPathSegments(fs_root_, Substitute("ts-$0-root", idx));
}
Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
int expected_count) {
TabletLocationsPB locations;
return WaitForReplicaCount(tablet_id, expected_count, &locations);
}
Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
int expected_count,
TabletLocationsPB* locations) {
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < kTabletReportWaitTimeSeconds) {
Status s =
leader_mini_master()->master()->catalog_manager()->GetTabletLocations(tablet_id, locations);
if (s.ok() && ((locations->stale() && expected_count == 0) ||
(!locations->stale() && locations->replicas_size() == expected_count))) {
return Status::OK();
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
return Status::TimedOut(Substitute("Tablet $0 never reached expected replica count $1",
tablet_id, expected_count));
}
Status MiniCluster::WaitForTabletServerCount(int count) {
vector<shared_ptr<master::TSDescriptor> > descs;
return WaitForTabletServerCount(count, &descs);
}
Status MiniCluster::WaitForTabletServerCount(int count,
vector<shared_ptr<TSDescriptor> >* descs) {
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
leader_mini_master()->master()->ts_manager()->GetAllDescriptors(descs);
if (descs->size() == count) {
// GetAllDescriptors() may return servers that are no longer online.
// Do a second step of verification to verify that the descs that we got
// are aligned (same uuid/seqno) with the TSs that we have in the cluster.
int match_count = 0;
for (const shared_ptr<TSDescriptor>& desc : *descs) {
for (auto mini_tablet_server : mini_tablet_servers_) {
auto ts = mini_tablet_server->server();
if (ts->instance_pb().permanent_uuid() == desc->permanent_uuid() &&
ts->instance_pb().instance_seqno() == desc->latest_seqno()) {
match_count++;
break;
}
}
}
if (match_count == count) {
LOG(INFO) << count << " TS(s) registered with Master after "
<< sw.elapsed().wall_seconds() << "s";
return Status::OK();
}
}
SleepFor(MonoDelta::FromMilliseconds(1));
}
return Status::TimedOut(Substitute("$0 TS(s) never registered with master", count));
}
Status MiniCluster::CreateClient(KuduClientBuilder* builder,
client::sp::shared_ptr<KuduClient>* client) {
KuduClientBuilder default_builder;
if (builder == nullptr) {
builder = &default_builder;
}
builder->clear_master_server_addrs();
for (const shared_ptr<MiniMaster>& master : mini_masters_) {
CHECK(master);
builder->add_master_server_addr(master->bound_rpc_addr_str());
}
return builder->Build(client);
}
} // namespace kudu