blob: dc837a7af22c3aa14036a9a945053270e94163ad [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/integration-tests/log_verifier.h"
#include <cstdint>
#include <iterator>
#include <map>
#include <memory>
#include <optional>
#include <ostream>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
using std::map;
using std::set;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
using cluster::MiniCluster;
using consensus::OpId;
using itest::MiniClusterFsInspector;
using log::LogReader;
LogVerifier::LogVerifier(cluster::MiniCluster* cluster)
: cluster_(cluster),
env_(cluster->env()) {
inspector_.reset(new MiniClusterFsInspector(cluster));
CHECK(inspector_);
}
LogVerifier::~LogVerifier() {}
Status LogVerifier::ScanForCommittedOpIds(int ts_idx, const string& tablet_id,
map<int64_t, int64_t>* index_to_term) {
shared_ptr<LogReader> reader;
const string wal_dir = JoinPathSegments(inspector_->WalDirForTS(ts_idx), tablet_id);
RETURN_NOT_OK(LogReader::Open(cluster_->ts_env(ts_idx),
wal_dir,
/*index*/nullptr,
tablet_id,
/*metric_entity*/nullptr,
/*file_cache*/nullptr,
&reader));
log::SegmentSequence segs;
reader->GetSegmentsSnapshot(&segs);
unique_ptr<log::LogEntryPB> entry;
for (const auto& seg : segs) {
log::LogEntryReader reader(seg.get());
while (true) {
Status s = reader.ReadNextEntry(&entry);
if (s.IsEndOfFile() || s.IsCorruption()) break;
RETURN_NOT_OK(s);
if (entry->type() != log::COMMIT) continue;
const auto& op_id = entry->commit().commited_op_id();
if (!InsertIfNotPresent(index_to_term, op_id.index(), op_id.term())) {
return Status::Corruption(Substitute(
"Index $0 had two COMMIT messages: $1.$0 and $2.$0",
op_id.index(), op_id.term(), (*index_to_term)[op_id.index()]));
}
}
}
return Status::OK();
}
Status LogVerifier::ScanForHighestCommittedOpIdInLog(int ts_idx,
const string& tablet_id,
OpId* commit_id) {
const string& wal_dir = inspector_->WalDirForTS(ts_idx);
map<int64_t, int64_t> index_to_term;
RETURN_NOT_OK_PREPEND(ScanForCommittedOpIds(ts_idx, tablet_id, &index_to_term),
Substitute("Couldn't scan log in dir $0", wal_dir));
if (index_to_term.empty()) {
return Status::NotFound("no COMMITs in log");
}
commit_id->set_index(index_to_term.rbegin()->first);
commit_id->set_term(index_to_term.rbegin()->second);
return Status::OK();
}
Status LogVerifier::VerifyCommittedOpIdsMatch() {
for (const string& tablet_id : inspector_->ListTablets()) {
LOG(INFO) << "Checking tablet " << tablet_id;
// Union set of the op indexes seen on any server.
set<int64_t> all_op_indexes;
// For each server in the cluster, a map of [index->term].
vector<map<int64_t, int64_t>> maps_by_ts(cluster_->num_tablet_servers());
// Gather the [index->term] map for each of the tablet servers
// hosting this tablet.
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
const string& wal_dir = JoinPathSegments(inspector_->WalDirForTS(i), tablet_id);
if (!env_->FileExists(wal_dir)) continue;
map<int64_t, int64_t> index_to_term;
RETURN_NOT_OK_PREPEND(ScanForCommittedOpIds(i, tablet_id, &index_to_term),
Substitute("Couldn't scan log for TS $0", i));
for (const auto& index_term : index_to_term) {
all_op_indexes.insert(index_term.first);
}
maps_by_ts[i] = std::move(index_to_term);
}
// Check that the terms match up across servers.
vector<int64_t> committed_terms;
// Indicates that the op is not on this server.
const int64_t kNotOnThisServer = -1;
for (int64_t index : all_op_indexes) {
committed_terms.clear();
for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) {
committed_terms.push_back(FindWithDefault(maps_by_ts[ts], index, kNotOnThisServer));
}
// 'committed_terms' entries should all be kNotOnThisServer or the same as each other.
std::optional<int> expected_term;
for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) {
int64_t this_ts_term = committed_terms[ts];
if (this_ts_term == kNotOnThisServer) continue; // this TS doesn't have the op
if (!expected_term.has_value()) {
expected_term = this_ts_term;
} else if (this_ts_term != expected_term) {
string err = Substitute("Mismatch found for index $0, [", index);
for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
if (i != 0) err += ", ";
strings::SubstituteAndAppend(&err, "T $0=$1",
cluster_->UuidForTS(i),
committed_terms[i]);
}
err += "]";
return Status::Corruption(err);
}
}
}
LOG(INFO) << "Verified matching terms for " << all_op_indexes.size() << " ops in tablet "
<< tablet_id;
}
return Status::OK();
}
} // namespace kudu