blob: 8c186a54a8039995e036e99dce0aebe07cded71a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
#include <algorithm>
#include <set>
#include <glog/logging.h>
#include "kudu/consensus/metadata.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
namespace kudu {
namespace itest {
using cluster::MiniCluster;
using std::set;
using std::string;
using std::vector;
using consensus::ConsensusMetadataPB;
using env_util::ListFilesInDir;
using strings::Substitute;
using tablet::TabletDataState;
using tablet::TabletSuperBlockPB;
MiniClusterFsInspector::MiniClusterFsInspector(MiniCluster* cluster)
: cluster_(cluster),
env_(cluster->env()) {}
int MiniClusterFsInspector::CountFilesInDir(const string& path,
StringPiece pattern) {
vector<string> entries;
Status s = ListFilesInDir(env_, path, &entries);
if (!s.ok()) return 0;
return std::count_if(entries.begin(), entries.end(), [&](const string& s) {
return pattern.empty() || MatchPattern(s, pattern);
string MiniClusterFsInspector::WalDirForTS(int ts_idx) const {
return JoinPathSegments(cluster_->WalRootForTS(ts_idx), FsManager::kWalDirName);
int MiniClusterFsInspector::CountWALFilesOnTS(int ts_idx) {
string ts_wal_dir = WalDirForTS(ts_idx);
vector<string> tablets;
CHECK_OK(ListFilesInDir(env_, ts_wal_dir, &tablets));
int total_segments = 0;
for (const string& tablet : tablets) {
string tablet_wal_dir = JoinPathSegments(ts_wal_dir, tablet);
total_segments += CountFilesInDir(tablet_wal_dir);
return total_segments;
vector<string> MiniClusterFsInspector::ListTablets() {
set<string> tablets;
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
auto ts_tablets = ListTabletsOnTS(i);
tablets.insert(ts_tablets.begin(), ts_tablets.end());
return vector<string>(tablets.begin(), tablets.end());
vector<string> MiniClusterFsInspector::ListTabletsOnTS(int ts_idx) {
string meta_dir = JoinPathSegments(cluster_->WalRootForTS(ts_idx),
vector<string> tablets;
CHECK_OK(ListFilesInDir(env_, meta_dir, &tablets));
return tablets;
vector<string> MiniClusterFsInspector::ListTabletsWithDataOnTS(int ts_idx) {
vector<string> tablets;
CHECK_OK(ListFilesInDir(env_, WalDirForTS(ts_idx), &tablets));
return tablets;
int MiniClusterFsInspector::CountFilesInWALDirForTS(
int ts_idx,
const string& tablet_id,
StringPiece pattern) {
string tablet_wal_dir = JoinPathSegments(WalDirForTS(ts_idx), tablet_id);
if (!env_->FileExists(tablet_wal_dir)) {
return 0;
return CountFilesInDir(tablet_wal_dir, pattern);
bool MiniClusterFsInspector::DoesConsensusMetaExistForTabletOnTS(int ts_idx,
const string& tablet_id) {
ConsensusMetadataPB cmeta_pb;
Status s = ReadConsensusMetadataOnTS(ts_idx, tablet_id, &cmeta_pb);
return s.ok();
int MiniClusterFsInspector::CountReplicasInMetadataDirs() {
// Rather than using FsManager's functionality for listing blocks, we just manually
// list the contents of the metadata directory. This is because we're using an
// external minicluster, and initializing a new FsManager to point at the running
// tablet servers isn't easy.
int count = 0;
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
count += CountFilesInDir(JoinPathSegments(cluster_->WalRootForTS(i),
return count;
Status MiniClusterFsInspector::CheckNoDataOnTS(int ts_idx) {
const string& wal_root = cluster_->WalRootForTS(ts_idx);
if (CountFilesInDir(JoinPathSegments(wal_root, FsManager::kTabletMetadataDirName)) > 0) {
return Status::IllegalState("tablet metadata blocks still exist", wal_root);
if (CountWALFilesOnTS(ts_idx) > 0) {
return Status::IllegalState("wals still exist", wal_root);
if (CountFilesInDir(JoinPathSegments(wal_root, FsManager::kConsensusMetadataDirName)) > 0) {
return Status::IllegalState("consensus metadata still exists", wal_root);
return Status::OK();;
Status MiniClusterFsInspector::CheckNoData() {
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
return Status::OK();;
string MiniClusterFsInspector::GetTabletSuperBlockPathOnTS(int ts_idx,
const string& tablet_id) const {
string meta_dir = JoinPathSegments(cluster_->WalRootForTS(ts_idx),
return JoinPathSegments(meta_dir, tablet_id);
Status MiniClusterFsInspector::ReadTabletSuperBlockOnTS(int ts_idx,
const string& tablet_id,
TabletSuperBlockPB* sb) {
const auto& sb_path = GetTabletSuperBlockPathOnTS(ts_idx, tablet_id);
return pb_util::ReadPBContainerFromPath(env_, sb_path, sb);
int64_t MiniClusterFsInspector::GetTabletSuperBlockMTimeOrDie(int ts_idx,
const string& tablet_id) {
int64_t timestamp;
GetTabletSuperBlockPathOnTS(ts_idx, tablet_id), &timestamp));
return timestamp;
string MiniClusterFsInspector::GetConsensusMetadataPathOnTS(int ts_idx,
const string& tablet_id) const {
string wal_root = cluster_->WalRootForTS(ts_idx);
string cmeta_dir = JoinPathSegments(wal_root, FsManager::kConsensusMetadataDirName);
return JoinPathSegments(cmeta_dir, tablet_id);
Status MiniClusterFsInspector::ReadConsensusMetadataOnTS(int ts_idx,
const string& tablet_id,
ConsensusMetadataPB* cmeta_pb) {
auto cmeta_path = GetConsensusMetadataPathOnTS(ts_idx, tablet_id);
if (!env_->FileExists(cmeta_path)) {
return Status::NotFound("Consensus metadata file not found", cmeta_path);
return pb_util::ReadPBContainerFromPath(env_, cmeta_path, cmeta_pb);
Status MiniClusterFsInspector::WriteConsensusMetadataOnTS(
int ts_idx,
const string& tablet_id,
const ConsensusMetadataPB& cmeta_pb) {
auto cmeta_path = GetConsensusMetadataPathOnTS(ts_idx, tablet_id);
return pb_util::WritePBContainerToPath(env_, cmeta_path, cmeta_pb,
pb_util::OVERWRITE, pb_util::NO_SYNC);
Status MiniClusterFsInspector::CheckTabletDataStateOnTS(
int ts_idx,
const string& tablet_id,
const vector<TabletDataState>& allowed_states) {
TabletSuperBlockPB sb;
RETURN_NOT_OK(ReadTabletSuperBlockOnTS(ts_idx, tablet_id, &sb));
if (std::find(allowed_states.begin(), allowed_states.end(), sb.tablet_data_state()) !=
allowed_states.end()) {
return Status::OK();
vector<string> state_names;
for (auto state : allowed_states) {
string expected_str = JoinStrings(state_names, ",");
if (state_names.size() > 1) {
expected_str = "one of: " + expected_str;
return Status::IllegalState(Substitute("State $0 unexpected, expected $1",
Status MiniClusterFsInspector::WaitForNoData(const MonoDelta& timeout) {
MonoTime deadline = MonoTime::Now() + timeout;
Status s;
while (true) {
s = CheckNoData();
if (s.ok()) return Status::OK();
if (deadline < MonoTime::Now()) {
return Status::TimedOut("Timed out waiting for no data", s.ToString());
Status MiniClusterFsInspector::WaitForNoDataOnTS(int ts_idx, const MonoDelta& timeout) {
MonoTime deadline = MonoTime::Now() + timeout;
Status s;
while (true) {
s = CheckNoDataOnTS(ts_idx);
if (s.ok()) return Status::OK();
if (deadline < MonoTime::Now()) {
return Status::TimedOut("Timed out waiting for no data", s.ToString());
Status MiniClusterFsInspector::WaitForMinFilesInTabletWalDirOnTS(int ts_idx,
const string& tablet_id,
int count,
const MonoDelta& timeout) {
int seen = 0;
MonoTime deadline = MonoTime::Now() + timeout;
while (true) {
seen = CountFilesInWALDirForTS(ts_idx, tablet_id);
if (seen >= count) return Status::OK();
if (deadline < MonoTime::Now()) {
return Status::TimedOut(Substitute("Timed out waiting for number of WAL segments on tablet $0 "
"on TS $1 to be $2. Found $3",
tablet_id, ts_idx, count, seen));
Status MiniClusterFsInspector::WaitForReplicaCount(int expected, const MonoDelta& timeout) {
const MonoTime deadline = MonoTime::Now() + timeout;
int found;
while (true) {
found = CountReplicasInMetadataDirs();
if (found == expected) {
return Status::OK();
if (MonoTime::Now() > deadline) {
return Status::TimedOut(
Substitute("Timed out waiting for a total replica count of $0. "
"Found $1 replicas", expected, found));
Status MiniClusterFsInspector::WaitForTabletDataStateOnTS(
int ts_idx,
const string& tablet_id,
const vector<TabletDataState>& expected_states,
const MonoDelta& timeout) {
MonoTime start = MonoTime::Now();
MonoTime deadline = start + timeout;
Status s;
while (true) {
s = CheckTabletDataStateOnTS(ts_idx, tablet_id, expected_states);
if (s.ok()) return Status::OK();
if (MonoTime::Now() > deadline) break;
return Status::TimedOut(Substitute("Timed out after $0 waiting for correct tablet state: $1",
(MonoTime::Now() - start).ToString(),
Status MiniClusterFsInspector::WaitForFilePatternInTabletWalDirOnTs(
int ts_idx, const string& tablet_id,
const vector<string>& substrings_required,
const vector<string>& substrings_disallowed,
const MonoDelta& timeout) {
Status s;
MonoTime deadline = MonoTime::Now() + timeout;
string tablet_wal_dir = JoinPathSegments(WalDirForTS(ts_idx), tablet_id);
string error_msg;
vector<string> entries;
while (true) {
Status s = ListFilesInDir(env_, tablet_wal_dir, &entries);
std::sort(entries.begin(), entries.end());
error_msg = "";
bool any_missing_required = false;
for (const string& required_filter : substrings_required) {
bool filter_matched = false;
for (const string& entry : entries) {
if (entry.find(required_filter) != string::npos) {
filter_matched = true;
if (!filter_matched) {
any_missing_required = true;
error_msg += "missing from substrings_required: " + required_filter + "; ";
bool any_present_disallowed = false;
for (const string& entry : entries) {
if (any_present_disallowed) break;
for (const string& disallowed_filter : substrings_disallowed) {
if (entry.find(disallowed_filter) != string::npos) {
any_present_disallowed = true;
error_msg += Substitute("present from substrings_disallowed: $0 ($1)",
entry, disallowed_filter);
if (!any_missing_required && !any_present_disallowed) {
return Status::OK();
if (MonoTime::Now() > deadline) {
return Status::TimedOut(Substitute("Timed out waiting for file pattern on "
"tablet $0 on TS $1 in directory $2",
tablet_id, ts_idx, tablet_wal_dir),
error_msg + "entries: " + JoinStrings(entries, ", "));
} // namespace itest
} // namespace kudu