blob: 6692102924bbc3925cc0c9095a39c5f259473377 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "kudu/consensus/log.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "kudu/clock/clock.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/async_util.h"
#include "kudu/util/env_util.h"
#include "kudu/util/file_cache.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
METRIC_DECLARE_entity(server);
METRIC_DECLARE_entity(tablet);
namespace kudu {
namespace log {
constexpr char kTestTable[] = "test-log-table";
constexpr char kTestTableId[] = "test-log-table-id";
constexpr char kTestTablet[] = "test-log-tablet";
constexpr bool APPEND_SYNC = true;
constexpr bool APPEND_ASYNC = false;
// Append a single batch of 'count' NoOps to the log.
// If 'size' is not NULL, increments it by the expected increase in log size.
// Increments 'op_id''s index once for each operation logged.
inline Status AppendNoOpsToLogSync(clock::Clock* clock,
Log* log,
consensus::OpId* op_id,
int count,
size_t* size = nullptr) {
std::vector<consensus::ReplicateRefPtr> replicates;
for (int i = 0; i < count; i++) {
consensus::ReplicateRefPtr replicate =
make_scoped_refptr_replicate(new consensus::ReplicateMsg());
consensus::ReplicateMsg* repl = replicate->get();
repl->mutable_id()->CopyFrom(*op_id);
repl->set_op_type(consensus::NO_OP);
repl->set_timestamp(clock->Now().ToUint64());
// Increment op_id.
op_id->set_index(op_id->index() + 1);
if (size) {
// If we're tracking the sizes we need to account for the fact that the Log wraps the
// log entry in an LogEntryBatchPB, and each actual entry will have a one-byte tag.
*size += repl->ByteSizeLong() + 1;
}
replicates.push_back(replicate);
}
// Account for the entry batch header and wrapper PB.
if (size) {
*size += log::kEntryHeaderSizeV2 + 5;
}
Synchronizer s;
RETURN_NOT_OK(log->AsyncAppendReplicates(replicates,
s.AsStatusCallback()));
return s.Wait();
}
inline Status AppendNoOpToLogSync(clock::Clock* clock,
Log* log,
consensus::OpId* op_id,
size_t* size = nullptr) {
return AppendNoOpsToLogSync(clock, log, op_id, 1, size);
}
// Corrupts the last segment of the provided log by either truncating it
// or modifying a byte at the given offset.
enum CorruptionType {
TRUNCATE_FILE,
FLIP_BYTE
};
inline Status CorruptLogFile(Env* env, const std::string& log_path,
CorruptionType type, int corruption_offset) {
faststring buf;
RETURN_NOT_OK_PREPEND(ReadFileToString(env, log_path, &buf),
"Couldn't read log");
switch (type) {
case TRUNCATE_FILE:
buf.resize(corruption_offset);
break;
case FLIP_BYTE:
CHECK_LT(corruption_offset, buf.size());
buf[corruption_offset] ^= 0xff;
break;
}
// Rewrite the file with the corrupt log.
RETURN_NOT_OK_PREPEND(WriteStringToFile(env, Slice(buf), log_path),
"Couldn't rewrite corrupt log file");
return Status::OK();
}
class LogTestBase : public KuduTest {
public:
typedef std::pair<int, int> DeltaId;
LogTestBase()
: schema_(GetSimpleTestSchema()),
log_anchor_registry_(new LogAnchorRegistry) {
}
void SetUp() override {
KuduTest::SetUp();
current_index_ = kStartIndex;
fs_manager_.reset(new FsManager(env_, FsManagerOpts(GetTestPath("fs_root"))));
metric_registry_.reset(new MetricRegistry);
metric_entity_tablet_ = METRIC_ENTITY_tablet.Instantiate(
metric_registry_.get(), "tablet");
metric_entity_server_ = METRIC_ENTITY_server.Instantiate(
metric_registry_.get(), "server");
// Capacity was chosen arbitrarily: high enough to cache multiple files, but
// low enough to see some eviction.
file_cache_.reset(new FileCache("log-test-base", env_, 5, metric_entity_server_));
ASSERT_OK(file_cache_->Init());
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
clock_.reset(new clock::HybridClock(metric_entity_server_));
ASSERT_OK(clock_->Init());
}
void TearDown() override {
KuduTest::TearDown();
}
Status BuildLog() {
Schema schema_with_ids = SchemaBuilder(schema_).Build();
return Log::Open(options_,
fs_manager_.get(),
file_cache_.get(),
kTestTablet,
schema_with_ids,
0, // schema_version
metric_entity_tablet_.get(),
&log_);
}
void CheckRightNumberOfSegmentFiles(int expected) {
// Test that we actually have the expected number of files in the fs.
// We should have n segments plus '.' and '..'
std::vector<std::string> files;
ASSERT_OK(env_->GetChildren(
JoinPathSegments(fs_manager_->GetWalsRootDir(),
kTestTablet),
&files));
int count = 0;
for (const std::string& s : files) {
if (HasPrefixString(s, FsManager::kWalFileNamePrefix)) {
count++;
}
}
ASSERT_EQ(expected, count);
}
void EntriesToIdList(std::vector<uint32_t>* ids) {
for (const auto& entry : entries_) {
VLOG(2) << "Entry contents: " << pb_util::SecureDebugString(*entry);
if (entry->type() == REPLICATE) {
ids->push_back(entry->replicate().id().index());
}
}
}
static void CheckReplicateResult(const consensus::ReplicateRefPtr& msg, const Status& s) {
CHECK_OK(s);
}
// Appends a batch with size 2 (1 insert, 1 mutate) to the log.
Status AppendReplicateBatch(const consensus::OpId& opid,
bool sync = APPEND_SYNC) {
consensus::ReplicateRefPtr replicate =
make_scoped_refptr_replicate(new consensus::ReplicateMsg());
replicate->get()->set_op_type(consensus::WRITE_OP);
replicate->get()->mutable_id()->CopyFrom(opid);
replicate->get()->set_timestamp(clock_->Now().ToUint64());
tserver::WriteRequestPB* batch_request = replicate->get()->mutable_write_request();
RETURN_NOT_OK(SchemaToPB(schema_, batch_request->mutable_schema()));
AddTestRowToPB(RowOperationsPB::INSERT, schema_,
opid.index(),
0,
"this is a test insert",
batch_request->mutable_row_operations());
AddTestRowToPB(RowOperationsPB::UPDATE, schema_,
opid.index() + 1,
0,
"this is a test mutate",
batch_request->mutable_row_operations());
batch_request->set_tablet_id(kTestTablet);
return AppendReplicateBatch(replicate, sync);
}
// Appends the provided batch to the log.
Status AppendReplicateBatch(const consensus::ReplicateRefPtr& replicate,
bool sync = APPEND_SYNC) {
if (sync) {
Synchronizer s;
RETURN_NOT_OK(log_->AsyncAppendReplicates({ replicate }, s.AsStatusCallback()));
return s.Wait();
}
// AsyncAppendReplicates does not free the ReplicateMsg on completion, so we
// need to pass it through to our callback.
return log_->AsyncAppendReplicates(
{ replicate }, [replicate](const Status& s) { CheckReplicateResult(replicate, s); });
}
static void CheckCommitResult(const Status& s) {
CHECK_OK(s);
}
// Append a commit log entry containing one entry for the insert and one
// for the mutate.
Status AppendCommit(const consensus::OpId& original_opid,
bool sync = APPEND_SYNC) {
// The mrs id for the insert.
constexpr int kTargetMrsId = 1;
// The rs and delta ids for the mutate.
constexpr int kTargetRsId = 0;
constexpr int kTargetDeltaId = 0;
return AppendCommit(original_opid, kTargetMrsId, kTargetRsId, kTargetDeltaId, sync);
}
Status AppendCommit(const consensus::OpId& original_opid,
int mrs_id,
int rs_id,
int dms_id,
bool sync = APPEND_SYNC) {
consensus::CommitMsg commit;
commit.set_op_type(consensus::WRITE_OP);
commit.mutable_commited_op_id()->CopyFrom(original_opid);
tablet::TxResultPB* result = commit.mutable_result();
tablet::OperationResultPB* insert = result->add_ops();
insert->add_mutated_stores()->set_mrs_id(mrs_id);
tablet::OperationResultPB* mutate = result->add_ops();
tablet::MemStoreTargetPB* target = mutate->add_mutated_stores();
target->set_dms_id(dms_id);
target->set_rs_id(rs_id);
return AppendCommit(commit, sync);
}
// Append a COMMIT message for 'original_opid', but with results
// indicating that the associated writes failed due to
// "NotFound" errors.
Status AppendCommitWithNotFoundOpResults(const consensus::OpId& original_opid) {
consensus::CommitMsg commit;
commit.set_op_type(consensus::WRITE_OP);
commit.mutable_commited_op_id()->CopyFrom(original_opid);
tablet::TxResultPB* result = commit.mutable_result();
tablet::OperationResultPB* insert = result->add_ops();
StatusToPB(Status::NotFound("fake failed write"), insert->mutable_failed_status());
tablet::OperationResultPB* mutate = result->add_ops();
StatusToPB(Status::NotFound("fake failed write"), mutate->mutable_failed_status());
return AppendCommit(commit);
}
Status AppendCommit(const consensus::CommitMsg& commit,
bool sync = APPEND_SYNC) {
if (sync) {
Synchronizer s;
RETURN_NOT_OK(log_->AsyncAppendCommit(commit, s.AsStatusCallback()));
return s.Wait();
}
return log_->AsyncAppendCommit(commit,
[](const Status& s) { CheckCommitResult(s); });
}
// Appends 'count' ReplicateMsgs and the corresponding CommitMsgs to the log
Status AppendReplicateBatchAndCommitEntryPairsToLog(int count,
bool sync = APPEND_SYNC) {
for (int i = 0; i < count; i++) {
consensus::OpId opid = consensus::MakeOpId(1, current_index_);
RETURN_NOT_OK(AppendReplicateBatch(opid));
RETURN_NOT_OK(AppendCommit(opid, sync));
current_index_ += 1;
}
return Status::OK();
}
// Append a single NO_OP entry. Increments op_id by one.
// If non-NULL, and if the write is successful, 'size' is incremented
// by the size of the written operation.
Status AppendNoOp(consensus::OpId* op_id, size_t* size = nullptr) {
return AppendNoOpToLogSync(clock_.get(), log_.get(), op_id, size);
}
// Append a number of no-op entries to the log.
// Increments op_id's index by the number of records written.
// If non-NULL, 'size' keeps track of the size of the operations
// successfully written.
Status AppendNoOps(consensus::OpId* op_id, int num, size_t* size = nullptr) {
for (int i = 0; i < num; i++) {
RETURN_NOT_OK(AppendNoOp(op_id, size));
}
return Status::OK();
}
Status RollLog() {
return log_->AllocateSegmentAndRollOverForTests();
}
static std::string DumpSegmentsToString(const SegmentSequence& segments) {
std::string dump;
for (const scoped_refptr<ReadableLogSegment>& segment : segments) {
dump.append("------------\n");
strings::SubstituteAndAppend(&dump, "Segment: $0, Path: $1\n",
segment->header().sequence_number(), segment->path());
strings::SubstituteAndAppend(&dump, "Header: $0\n",
pb_util::SecureShortDebugString(segment->header()));
if (segment->HasFooter()) {
strings::SubstituteAndAppend(&dump, "Footer: $0\n",
pb_util::SecureShortDebugString(segment->footer()));
} else {
dump.append("Footer: None or corrupt.");
}
}
return dump;
}
protected:
enum {
kStartIndex = 1
};
const Schema schema_;
std::unique_ptr<FsManager> fs_manager_;
std::unique_ptr<MetricRegistry> metric_registry_;
std::unique_ptr<FileCache> file_cache_;
scoped_refptr<MetricEntity> metric_entity_tablet_;
scoped_refptr<MetricEntity> metric_entity_server_;
scoped_refptr<Log> log_;
int64_t current_index_;
LogOptions options_;
// Reusable entries vector that deletes the entries on destruction.
LogEntries entries_;
scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
std::unique_ptr<clock::Clock> clock_;
};
} // namespace log
} // namespace kudu