master: additional leader lock assertions in catalog manager
I went through the catalog manager entry points and added leader lock
assertions where necessary, updating tests as needed.
I also snuck in a couple cluster fixes:
1. MiniCluster::leader_mini_master() was unsafe because it didn't pass the
(now held) leader lock back to the caller. It's only used in a few places
though, so I removed it outright rather than fix it.
2. WaitForTabletServerCount() has been updated for both kinds of clusters.
The new version waits for the correct count on every master, a necessary
change now that tservers heartbeat to every master. Without this, we may
stop waiting when the master that has seen all tservers was a follower
and fail a subsequent CreateTable. The new version also ignore masters
that have been shut down. This isn't essential, but good future-proofing.
Change-Id: I5bb2f5067cdbdd93900a80255def65a26216f6ea
Reviewed-on: http://gerrit.cloudera.org:8080/3684
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <todd@apache.org>
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index dd242d6..f9d1d31 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -2318,8 +2318,13 @@
ASSERT_EQ(kRenamedTableName, tablet_peer->tablet()->metadata()->table_name());
CatalogManager *catalog_manager = cluster_->mini_master()->master()->catalog_manager();
- ASSERT_TRUE(catalog_manager->TableNameExists(kRenamedTableName));
- ASSERT_FALSE(catalog_manager->TableNameExists(kTableName));
+ CatalogManager::ScopedLeaderSharedLock l(catalog_manager);
+ ASSERT_OK(l.first_failed_status());
+ bool exists;
+ ASSERT_OK(catalog_manager->TableNameExists(kRenamedTableName, &exists));
+ ASSERT_TRUE(exists);
+ ASSERT_OK(catalog_manager->TableNameExists(kTableName, &exists));
+ ASSERT_FALSE(exists);
}
}
@@ -2338,7 +2343,13 @@
string tablet_id = GetFirstTabletId(client_table_.get());
ASSERT_OK(client_->DeleteTable(kTableName));
CatalogManager *catalog_manager = cluster_->mini_master()->master()->catalog_manager();
- ASSERT_FALSE(catalog_manager->TableNameExists(kTableName));
+ {
+ CatalogManager::ScopedLeaderSharedLock l(catalog_manager);
+ ASSERT_OK(l.first_failed_status());
+ bool exists;
+ ASSERT_OK(catalog_manager->TableNameExists(kTableName, &exists));
+ ASSERT_FALSE(exists);
+ }
// Wait until the table is removed from the TS
int wait_time = 1000;
diff --git a/src/kudu/integration-tests/external_mini_cluster.cc b/src/kudu/integration-tests/external_mini_cluster.cc
index a7300df..1b85fb8 100644
--- a/src/kudu/integration-tests/external_mini_cluster.cc
+++ b/src/kudu/integration-tests/external_mini_cluster.cc
@@ -22,6 +22,7 @@
#include <memory>
#include <rapidjson/document.h>
#include <string>
+#include <unordered_set>
#include "kudu/client/client.h"
#include "kudu/common/wire_protocol.h"
@@ -59,6 +60,7 @@
using rapidjson::Value;
using std::string;
using std::unique_ptr;
+using std::unordered_set;
using strings::Substitute;
typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB;
@@ -290,18 +292,26 @@
MonoTime deadline = MonoTime::Now(MonoTime::FINE);
deadline.AddDelta(timeout);
+ unordered_set<int> masters_to_search;
+ for (int i = 0; i < masters_.size(); i++) {
+ if (!masters_[i]->IsShutdown()) {
+ masters_to_search.insert(i);
+ }
+ }
+
while (true) {
MonoDelta remaining = deadline.GetDeltaSince(MonoTime::Now(MonoTime::FINE));
if (remaining.ToSeconds() < 0) {
- return Status::TimedOut(Substitute("$0 TS(s) never registered with master", count));
+ return Status::TimedOut(Substitute(
+ "Timed out waiting for $0 TS(s) to register with all masters", count));
}
- for (int i = 0; i < masters_.size(); i++) {
+ for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
master::ListTabletServersRequestPB req;
master::ListTabletServersResponsePB resp;
rpc::RpcController rpc;
rpc.set_timeout(remaining);
- RETURN_NOT_OK_PREPEND(master_proxy(i)->ListTabletServers(req, &resp, &rpc),
+ RETURN_NOT_OK_PREPEND(master_proxy(*iter)->ListTabletServers(req, &resp, &rpc),
"ListTabletServers RPC failed");
// ListTabletServers() may return servers that are no longer online.
// Do a second step of verification to verify that the descs that we got
@@ -317,10 +327,18 @@
}
}
if (match_count == count) {
- LOG(INFO) << count << " TS(s) registered with Master";
- return Status::OK();
+ // This master has returned the correct set of tservers.
+ iter = masters_to_search.erase(iter);
+ } else {
+ iter++;
}
}
+
+ if (masters_to_search.empty()) {
+ // All masters have returned the correct set of tservers.
+ LOG(INFO) << count << " TS(s) registered with all masters";
+ return Status::OK();
+ }
SleepFor(MonoDelta::FromMilliseconds(1));
}
}
diff --git a/src/kudu/integration-tests/external_mini_cluster.h b/src/kudu/integration-tests/external_mini_cluster.h
index 9bf4c1d..4573b7e 100644
--- a/src/kudu/integration-tests/external_mini_cluster.h
+++ b/src/kudu/integration-tests/external_mini_cluster.h
@@ -222,10 +222,9 @@
// master at 'idx' is running.
std::shared_ptr<master::MasterServiceProxy> master_proxy(int idx);
- // Wait until the number of registered tablet servers reaches the
- // given count on at least one of the running masters. Returns
- // Status::TimedOut if the desired count is not achieved with the
- // given timeout.
+ // Wait until the number of registered tablet servers reaches the given count
+ // on all of the running masters. Returns Status::TimedOut if the desired
+ // count is not achieved with the given timeout.
Status WaitForTabletServerCount(int count, const MonoDelta& timeout);
// Runs gtest assertions that no servers have crashed.
diff --git a/src/kudu/integration-tests/master_replication-itest.cc b/src/kudu/integration-tests/master_replication-itest.cc
index 47e47ea..1307014 100644
--- a/src/kudu/integration-tests/master_replication-itest.cc
+++ b/src/kudu/integration-tests/master_replication-itest.cc
@@ -124,12 +124,6 @@
.Create();
}
- void VerifyTableExists(const std::string& table_id) {
- LOG(INFO) << "Verifying that " << table_id << " exists on leader..";
- ASSERT_TRUE(cluster_->leader_mini_master()->master()
- ->catalog_manager()->TableNameExists(table_id));
- }
-
protected:
int num_masters_;
MiniClusterOptions opts_;
@@ -154,7 +148,25 @@
// Repeat the same for the second table.
ASSERT_OK(CreateTable(client, kTableId2));
- ASSERT_NO_FATAL_FAILURE(VerifyTableExists(kTableId2));
+
+ // Verify that both tables exist. There can be a leader election at any time
+ // so we need to loop and try all masters.
+ while (true) {
+ for (int i = 0; i < cluster_->num_masters(); i++) {
+ CatalogManager* catalog =
+ cluster_->mini_master(i)->master()->catalog_manager();
+ CatalogManager::ScopedLeaderSharedLock l(catalog);
+ if (l.first_failed_status().ok()) {
+ bool exists;
+ ASSERT_OK(catalog->TableNameExists(kTableId1, &exists));
+ ASSERT_TRUE(exists);
+ ASSERT_OK(catalog->TableNameExists(kTableId2, &exists));
+ ASSERT_TRUE(exists);
+ return;
+ }
+ }
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
}
// When all masters are down, test that we can timeout the connection
diff --git a/src/kudu/integration-tests/mini_cluster.cc b/src/kudu/integration-tests/mini_cluster.cc
index 62e4531..78646ee 100644
--- a/src/kudu/integration-tests/mini_cluster.cc
+++ b/src/kudu/integration-tests/mini_cluster.cc
@@ -179,28 +179,6 @@
return Status::OK();
}
-MiniMaster* MiniCluster::leader_mini_master() {
- Stopwatch sw;
- sw.start();
- while (sw.elapsed().wall_seconds() < kMasterLeaderElectionWaitTimeSeconds) {
- for (int i = 0; i < mini_masters_.size(); i++) {
- MiniMaster* master = mini_master(i);
- if (master->master()->IsShutdown()) {
- continue;
- }
- CatalogManager::ScopedLeaderSharedLock l(
- master->master()->catalog_manager());
- if (l.first_failed_status().ok()) {
- return master;
- }
- }
- SleepFor(MonoDelta::FromMilliseconds(1));
- }
- LOG(ERROR) << "No leader master elected after " << kMasterLeaderElectionWaitTimeSeconds
- << " seconds.";
- return nullptr;
-}
-
void MiniCluster::Shutdown() {
for (const shared_ptr<MiniTabletServer>& tablet_server : mini_tablet_servers_) {
tablet_server->Shutdown();
@@ -240,29 +218,6 @@
return JoinPathSegments(fs_root_, Substitute("ts-$0-root", idx));
}
-Status MiniCluster::WaitForReplicaCount(const string& tablet_id,
- int expected_count,
- TabletLocationsPB* locations) {
- Stopwatch sw;
- sw.start();
- while (sw.elapsed().wall_seconds() < kTabletReportWaitTimeSeconds) {
- CatalogManager* catalog = leader_mini_master()->master()->catalog_manager();
- Status s;
- {
- CatalogManager::ScopedLeaderSharedLock l(catalog);
- RETURN_NOT_OK(l.first_failed_status());
- s = catalog->GetTabletLocations(tablet_id, locations);
- }
- if (s.ok() && locations->replicas_size() == expected_count) {
- return Status::OK();
- }
-
- SleepFor(MonoDelta::FromMilliseconds(1));
- }
- return Status::TimedOut(Substitute("Tablet $0 never reached expected replica count $1",
- tablet_id, expected_count));
-}
-
Status MiniCluster::WaitForTabletServerCount(int count) {
vector<shared_ptr<master::TSDescriptor>> descs;
return WaitForTabletServerCount(count, MatchMode::MATCH_TSERVERS, &descs);
@@ -271,14 +226,18 @@
Status MiniCluster::WaitForTabletServerCount(int count,
MatchMode mode,
vector<shared_ptr<TSDescriptor>>* descs) {
+ unordered_set<int> masters_to_search;
+ for (int i = 0; i < num_masters(); i++) {
+ if (!mini_master(i)->master()->IsShutdown()) {
+ masters_to_search.insert(i);
+ }
+ }
+
Stopwatch sw;
sw.start();
while (sw.elapsed().wall_seconds() < kRegistrationWaitTimeSeconds) {
- leader_mini_master()->master()->ts_manager()->GetAllDescriptors(descs);
- if (descs->size() == count) {
- // GetAllDescriptors() may return servers that are no longer online.
- // Do a second step of verification to verify that the descs that we got
- // are aligned (same uuid/seqno) with the TSs that we have in the cluster.
+ for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) {
+ mini_master(*iter)->master()->ts_manager()->GetAllDescriptors(descs);
int match_count = 0;
switch (mode) {
case MatchMode::MATCH_TSERVERS:
@@ -304,14 +263,22 @@
}
if (match_count == count) {
- LOG(INFO) << count << " TS(s) registered with Master after "
- << sw.elapsed().wall_seconds() << "s";
- return Status::OK();
+ // This master has returned the correct set of tservers.
+ iter = masters_to_search.erase(iter);
+ } else {
+ iter++;
}
}
+ if (masters_to_search.empty()) {
+ // All masters have returned the correct set of tservers.
+ LOG(INFO) << Substitute("$0 TS(s) registered with all masters after $1s",
+ count, sw.elapsed().wall_seconds());
+ return Status::OK();
+ }
SleepFor(MonoDelta::FromMilliseconds(1));
}
- return Status::TimedOut(Substitute("$0 TS(s) never registered with master", count));
+ return Status::TimedOut(Substitute(
+ "Timed out waiting for $0 TS(s) to register with all masters", count));
}
Status MiniCluster::CreateClient(KuduClientBuilder* builder,
diff --git a/src/kudu/integration-tests/mini_cluster.h b/src/kudu/integration-tests/mini_cluster.h
index 12a5211..8e64b3f 100644
--- a/src/kudu/integration-tests/mini_cluster.h
+++ b/src/kudu/integration-tests/mini_cluster.h
@@ -109,10 +109,6 @@
return mini_master(0);
}
- // Returns the leader Master for this MiniCluster or NULL if none can be
- // found. May block until a leader Master is ready.
- master::MiniMaster* leader_mini_master();
-
// Returns the Master at index 'idx' for this MiniCluster.
master::MiniMaster* mini_master(int idx);
@@ -129,18 +125,9 @@
std::string GetTabletServerFsRoot(int idx);
- // Wait for the given tablet to have 'expected_count' replicas
- // reported on the master. Returns the locations in '*locations'.
- // Requires that the master has started;
- // Returns a bad Status if the tablet does not reach the required count
- // within kTabletReportWaitTimeSeconds.
- Status WaitForReplicaCount(const std::string& tablet_id,
- int expected_count,
- master::TabletLocationsPB* locations);
-
// Wait until the number of registered tablet servers reaches the given
- // count. Returns Status::TimedOut if the desired count is not achieved
- // within kRegistrationWaitTimeSeconds.
+ // count on all masters. Returns Status::TimedOut if the desired count is not
+ // achieved within kRegistrationWaitTimeSeconds.
enum class MatchMode {
// Ensure that the tservers retrieved from each master match up against the
// tservers defined in this cluster. The matching is done via
@@ -168,9 +155,7 @@
private:
enum {
- kTabletReportWaitTimeSeconds = 5,
kRegistrationWaitTimeSeconds = 15,
- kMasterLeaderElectionWaitTimeSeconds = 10
};
bool running_;
diff --git a/src/kudu/integration-tests/registration-test.cc b/src/kudu/integration-tests/registration-test.cc
index 9eeca5f..57a2c70 100644
--- a/src/kudu/integration-tests/registration-test.cc
+++ b/src/kudu/integration-tests/registration-test.cc
@@ -88,6 +88,25 @@
ASSERT_STR_CONTAINS(buf.ToString(), VersionInfo::GetShortVersionString());
}
+
+ Status WaitForReplicaCount(const string& tablet_id, int expected_count,
+ TabletLocationsPB* locations) {
+ while (true) {
+ master::CatalogManager* catalog = cluster_->mini_master()->master()->catalog_manager();
+ Status s;
+ {
+ master::CatalogManager::ScopedLeaderSharedLock l(catalog);
+ RETURN_NOT_OK(l.first_failed_status());
+ s = catalog->GetTabletLocations(tablet_id, locations);
+ }
+ if (s.ok() && locations->replicas_size() == expected_count) {
+ return Status::OK();
+ }
+
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ }
+ }
+
protected:
gscoped_ptr<MiniCluster> cluster_;
Schema schema_;
@@ -142,13 +161,13 @@
CreateTabletForTesting(cluster_->mini_master(), "fake-table", schema_, &tablet_id_1);
TabletLocationsPB locs;
- ASSERT_OK(cluster_->WaitForReplicaCount(tablet_id_1, 1, &locs));
+ ASSERT_OK(WaitForReplicaCount(tablet_id_1, 1, &locs));
ASSERT_EQ(1, locs.replicas_size());
LOG(INFO) << "Tablet successfully reported on " << locs.replicas(0).ts_info().permanent_uuid();
// Add another tablet, make sure it is reported via incremental.
CreateTabletForTesting(cluster_->mini_master(), "fake-table2", schema_, &tablet_id_2);
- ASSERT_OK(cluster_->WaitForReplicaCount(tablet_id_2, 1, &locs));
+ ASSERT_OK(WaitForReplicaCount(tablet_id_2, 1, &locs));
// Shut down the whole system, bring it back up, and make sure the tablets
// are reported.
@@ -156,8 +175,8 @@
ASSERT_OK(cluster_->mini_master()->Restart());
ASSERT_OK(ts->Start());
- ASSERT_OK(cluster_->WaitForReplicaCount(tablet_id_1, 1, &locs));
- ASSERT_OK(cluster_->WaitForReplicaCount(tablet_id_2, 1, &locs));
+ ASSERT_OK(WaitForReplicaCount(tablet_id_1, 1, &locs));
+ ASSERT_OK(WaitForReplicaCount(tablet_id_2, 1, &locs));
// TODO: KUDU-870: once the master supports detecting failed/lost replicas,
// we should add a test case here which removes or corrupts metadata, restarts
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 7d58038..88382df 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -1713,23 +1713,35 @@
return Status::OK();
}
-bool CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableInfo> *table) {
+Status CatalogManager::GetTableInfo(const string& table_id, scoped_refptr<TableInfo> *table) {
+ leader_lock_.AssertAcquiredForReading();
+ RETURN_NOT_OK(CheckOnline());
+
shared_lock<LockType> l(lock_);
*table = FindPtrOrNull(table_ids_map_, table_id);
- return *table != nullptr;
+ return Status::OK();
}
-void CatalogManager::GetAllTables(std::vector<scoped_refptr<TableInfo> > *tables) {
+Status CatalogManager::GetAllTables(std::vector<scoped_refptr<TableInfo>>* tables) {
+ leader_lock_.AssertAcquiredForReading();
+ RETURN_NOT_OK(CheckOnline());
+
tables->clear();
shared_lock<LockType> l(lock_);
for (const TableInfoMap::value_type& e : table_ids_map_) {
tables->push_back(e.second);
}
+
+ return Status::OK();
}
-bool CatalogManager::TableNameExists(const string& table_name) {
+Status CatalogManager::TableNameExists(const string& table_name, bool* exists) {
+ leader_lock_.AssertAcquiredForReading();
+ RETURN_NOT_OK(CheckOnline());
+
shared_lock<LockType> l(lock_);
- return table_names_map_.find(table_name) != table_names_map_.end();
+ *exists = ContainsKey(table_names_map_, table_name);
+ return Status::OK();
}
void CatalogManager::NotifyTabletDeleteSuccess(const string& permanent_uuid,
diff --git a/src/kudu/master/catalog_manager.h b/src/kudu/master/catalog_manager.h
index e348d98..29119f5 100644
--- a/src/kudu/master/catalog_manager.h
+++ b/src/kudu/master/catalog_manager.h
@@ -467,18 +467,23 @@
// given output stream. This is verbose, meant for debugging.
void DumpState(std::ostream* out) const;
- // Return true if the table with the specified ID exists,
- // and set the table pointer to the TableInfo object
+ // Retrieve a table by ID, or null if no such table exists. May fail if the
+ // catalog manager is not yet running. Caller must hold leader_lock_.
+ //
// NOTE: This should only be used by tests or web-ui
- bool GetTableInfo(const std::string& table_id, scoped_refptr<TableInfo> *table);
+ Status GetTableInfo(const std::string& table_id, scoped_refptr<TableInfo> *table);
- // Return all the available TableInfo, which also may include not running tables
+ // Retrieve all known tables, even those that are not running. May fail if
+ // the catalog manager is not yet running. Caller must hold leader_lock_.
+ //
// NOTE: This should only be used by tests or web-ui
- void GetAllTables(std::vector<scoped_refptr<TableInfo> > *tables);
+ Status GetAllTables(std::vector<scoped_refptr<TableInfo>>* tables);
- // Return true if the specified table name exists
+ // Check if a table exists by name, setting 'exist' appropriately. May fail
+ // if the catalog manager is not yet running. Caller must hold leader_lock_.
+ //
// NOTE: This should only be used by tests
- bool TableNameExists(const std::string& table_name);
+ Status TableNameExists(const std::string& table_name, bool* exists);
// Let the catalog manager know that the the given tablet server successfully
// deleted the specified tablet.
diff --git a/src/kudu/master/master-path-handlers.cc b/src/kudu/master/master-path-handlers.cc
index d3f1c0f..d97281c 100644
--- a/src/kudu/master/master-path-handlers.cc
+++ b/src/kudu/master/master-path-handlers.cc
@@ -83,9 +83,15 @@
void MasterPathHandlers::HandleCatalogManager(const Webserver::WebRequest& req,
stringstream* output) {
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ if (!l.first_failed_status().ok()) {
+ *output << "Master is not ready: " << l.first_failed_status().ToString();
+ return;
+ }
+
*output << "<h1>Tables</h1>\n";
- std::vector<scoped_refptr<TableInfo> > tables;
+ std::vector<scoped_refptr<TableInfo>> tables;
master_->catalog_manager()->GetAllTables(&tables);
*output << "<table class='table table-striped'>\n";
@@ -132,8 +138,20 @@
return;
}
+ CatalogManager::ScopedLeaderSharedLock l(master_->catalog_manager());
+ if (!l.first_failed_status().ok()) {
+ *output << "Master is not ready: " << l.first_failed_status().ToString();
+ return;
+ }
+
scoped_refptr<TableInfo> table;
- if (!master_->catalog_manager()->GetTableInfo(table_id, &table)) {
+ Status s = master_->catalog_manager()->GetTableInfo(table_id, &table);
+ if (!s.ok()) {
+ *output << "Master is not ready: " << s.ToString();
+ return;
+ }
+
+ if (!table) {
*output << "Table not found";
return;
}
@@ -160,7 +178,7 @@
*output << "</table>\n";
SchemaFromPB(l.data().pb.schema(), &schema);
- Status s = PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema);
+ s = PartitionSchema::FromPB(l.data().pb.partition_schema(), schema, &partition_schema);
if (!s.ok()) {
*output << "Unable to decode partition schema: " << s.ToString();
return;